Skip to main content

arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
50#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52    /// Write padding after memory buffers to this multiple of bytes.
53    /// Must be 8, 16, 32, or 64 - defaults to 64.
54    alignment: u8,
55    /// The legacy format is for releases before 0.15.0, and uses metadata V4
56    write_legacy_ipc_format: bool,
57    /// The metadata version to write. The Rust IPC writer supports V4+
58    ///
59    /// *Default versions per crate*
60    ///
61    /// When creating the default IpcWriteOptions, the following metadata versions are used:
62    ///
63    /// version 2.0.0: V4, with legacy format enabled
64    /// version 4.0.0: V5
65    metadata_version: crate::MetadataVersion,
66    /// Compression, if desired. Will result in a runtime error
67    /// if the corresponding feature is not enabled
68    batch_compression_type: Option<crate::CompressionType>,
69    /// How to handle updating dictionaries in IPC messages
70    dictionary_handling: DictionaryHandling,
71}
72
73/// A single buffer segment ready to be written to the output stream.
74///
75/// For the uncompressed path the original Arc-backed [`Buffer`] is stored
76/// directly (zero copy). For the compressed path the compressed bytes are
77/// owned by a scratch `Vec<u8>`.
78enum EncodedBuffer {
79    /// Uncompressed: Arc-backed reference to the original array buffer.
80    Raw(Buffer),
81    /// Compressed: owned scratch bytes produced by the codec.
82    Compressed(Vec<u8>),
83}
84
85impl EncodedBuffer {
86    fn as_slice(&self) -> &[u8] {
87        match self {
88            EncodedBuffer::Raw(b) => b.as_slice(),
89            EncodedBuffer::Compressed(v) => v.as_slice(),
90        }
91    }
92
93    fn len(&self) -> usize {
94        match self {
95            EncodedBuffer::Raw(b) => b.len(),
96            EncodedBuffer::Compressed(v) => v.len(),
97        }
98    }
99}
100/// Accumulates the IPC metadata produced by [`write_array_data`].
101///
102/// `nodes` and `buffers` are serialised into the flatbuffer `RecordBatch` (or `DictionaryBatch`)
103/// header. The companion [`IpcBodySink`] holds the actual encoded bytes.
104#[derive(Default)]
105struct IpcMetadataBuilder {
106    nodes: Vec<crate::FieldNode>,
107    buffers: Vec<crate::Buffer>,
108}
109
110/// Destination for the raw Arrow data bytes (the IPC message body) produced by [`write_array_data`].
111///
112/// The companion [`IpcMetadataBuilder`] accumulates the flatbuffer metadata
113/// (offset + length of each buffer in the body); together they form a complete IPC message.
114enum IpcBodySink<'a> {
115    /// Serialize buffer bytes (with padding) into a contiguous byte vec.
116    Write(&'a mut Vec<u8>),
117    /// Accumulate pre-encoded buffer segments for deferred zero-copy streaming.
118    Collect(&'a mut Vec<EncodedBuffer>),
119}
120impl<'a> IpcBodySink<'a> {
121    /// Writes the encoded buffer to the sink.
122    pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
123        match self {
124            IpcBodySink::Write(vec) => {
125                vec.extend_from_slice(buffer.as_slice());
126                vec.extend_from_slice(&PADDING[..pad_len]);
127            }
128            IpcBodySink::Collect(vec) => {
129                vec.push(buffer);
130            }
131        }
132    }
133}
134
135/// Per-message sizes produced by [`IpcDataGenerator::write`].
136///
137/// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for
138/// random-access reads.
139struct IpcWriteMetadata {
140    /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written
141    /// before the record batch.
142    dictionary_block_sizes: Vec<(usize, usize)>,
143    /// Flatbuffer header size including continuation prefix and alignment padding.
144    padded_header_len: usize,
145    /// Total length of the record-batch body including trailing alignment padding.
146    body_len: usize,
147}
148
149impl IpcWriteOptions {
150    /// Configures compression when writing IPC files.
151    ///
152    /// Will result in a runtime error if the corresponding feature
153    /// is not enabled
154    pub fn try_with_compression(
155        mut self,
156        batch_compression_type: Option<crate::CompressionType>,
157    ) -> Result<Self, ArrowError> {
158        self.batch_compression_type = batch_compression_type;
159
160        if self.batch_compression_type.is_some()
161            && self.metadata_version < crate::MetadataVersion::V5
162        {
163            return Err(ArrowError::InvalidArgumentError(
164                "Compression only supported in metadata v5 and above".to_string(),
165            ));
166        }
167        Ok(self)
168    }
169    /// Try to create IpcWriteOptions, checking for incompatible settings
170    pub fn try_new(
171        alignment: usize,
172        write_legacy_ipc_format: bool,
173        metadata_version: crate::MetadataVersion,
174    ) -> Result<Self, ArrowError> {
175        let is_alignment_valid =
176            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
177        if !is_alignment_valid {
178            return Err(ArrowError::InvalidArgumentError(
179                "Alignment should be 8, 16, 32, or 64.".to_string(),
180            ));
181        }
182        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
183        match metadata_version {
184            crate::MetadataVersion::V1
185            | crate::MetadataVersion::V2
186            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
187                "Writing IPC metadata version 3 and lower not supported".to_string(),
188            )),
189            #[allow(deprecated)]
190            crate::MetadataVersion::V4 => Ok(Self {
191                alignment,
192                write_legacy_ipc_format,
193                metadata_version,
194                batch_compression_type: None,
195                dictionary_handling: DictionaryHandling::default(),
196            }),
197            crate::MetadataVersion::V5 => {
198                if write_legacy_ipc_format {
199                    Err(ArrowError::InvalidArgumentError(
200                        "Legacy IPC format only supported on metadata version 4".to_string(),
201                    ))
202                } else {
203                    Ok(Self {
204                        alignment,
205                        write_legacy_ipc_format,
206                        metadata_version,
207                        batch_compression_type: None,
208                        dictionary_handling: DictionaryHandling::default(),
209                    })
210                }
211            }
212            z => Err(ArrowError::InvalidArgumentError(format!(
213                "Unsupported crate::MetadataVersion {z:?}"
214            ))),
215        }
216    }
217
218    /// Configure how dictionaries are handled in IPC messages
219    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
220        self.dictionary_handling = dictionary_handling;
221        self
222    }
223}
224
225impl Default for IpcWriteOptions {
226    fn default() -> Self {
227        Self {
228            alignment: 64,
229            write_legacy_ipc_format: false,
230            metadata_version: crate::MetadataVersion::V5,
231            batch_compression_type: None,
232            dictionary_handling: DictionaryHandling::default(),
233        }
234    }
235}
236
237#[derive(Debug, Default)]
238/// Handles low level details of encoding [`Array`] and [`Schema`] into the
239/// [Arrow IPC Format].
240///
241/// # Example
242/// ```
243/// # fn run() {
244/// # use std::sync::Arc;
245/// # use arrow_array::UInt64Array;
246/// # use arrow_array::RecordBatch;
247/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
248///
249/// // Create a record batch
250/// let batch = RecordBatch::try_from_iter(vec![
251///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
252/// ]).unwrap();
253///
254/// // Error of dictionary ids are replaced.
255/// let error_on_replacement = true;
256/// let options = IpcWriteOptions::default();
257/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
258///
259/// let mut compression_context = CompressionContext::default();
260///
261/// // encode the batch into zero or more encoded dictionaries
262/// // and the data for the actual array.
263/// let data_gen = IpcDataGenerator::default();
264/// let (encoded_dictionaries, encoded_message) = data_gen
265///   .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context)
266///   .unwrap();
267/// # }
268/// ```
269///
270/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
271pub struct IpcDataGenerator {}
272
273impl IpcDataGenerator {
274    /// Converts a schema to an IPC message along with `dictionary_tracker`
275    /// and returns it encoded inside [EncodedData] as a flatbuffer.
276    pub fn schema_to_bytes_with_dictionary_tracker(
277        &self,
278        schema: &Schema,
279        dictionary_tracker: &mut DictionaryTracker,
280        write_options: &IpcWriteOptions,
281    ) -> EncodedData {
282        let mut fbb = FlatBufferBuilder::new();
283        let schema = {
284            let fb = IpcSchemaEncoder::new()
285                .with_dictionary_tracker(dictionary_tracker)
286                .schema_to_fb_offset(&mut fbb, schema);
287            fb.as_union_value()
288        };
289
290        let mut message = crate::MessageBuilder::new(&mut fbb);
291        message.add_version(write_options.metadata_version);
292        message.add_header_type(crate::MessageHeader::Schema);
293        message.add_bodyLength(0);
294        message.add_header(schema);
295        // TODO: custom metadata
296        let data = message.finish();
297        fbb.finish(data, None);
298
299        let data = fbb.finished_data();
300        EncodedData {
301            ipc_message: data.to_vec(),
302            arrow_data: vec![],
303        }
304    }
305
306    fn _encode_dictionaries<I: Iterator<Item = i64>>(
307        &self,
308        column: &ArrayRef,
309        encoded_dictionaries: &mut Vec<EncodedData>,
310        dictionary_tracker: &mut DictionaryTracker,
311        write_options: &IpcWriteOptions,
312        dict_id: &mut I,
313        compression_context: &mut CompressionContext,
314    ) -> Result<(), ArrowError> {
315        match column.data_type() {
316            DataType::Struct(fields) => {
317                let s = as_struct_array(column);
318                for (field, column) in fields.iter().zip(s.columns()) {
319                    self.encode_dictionaries(
320                        field,
321                        column,
322                        encoded_dictionaries,
323                        dictionary_tracker,
324                        write_options,
325                        dict_id,
326                        compression_context,
327                    )?;
328                }
329            }
330            DataType::RunEndEncoded(_, values) => {
331                let data = column.to_data();
332                if data.child_data().len() != 2 {
333                    return Err(ArrowError::InvalidArgumentError(format!(
334                        "The run encoded array should have exactly two child arrays. Found {}",
335                        data.child_data().len()
336                    )));
337                }
338                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
339                // only for values array.
340                let values_array = make_array(data.child_data()[1].clone());
341                self.encode_dictionaries(
342                    values,
343                    &values_array,
344                    encoded_dictionaries,
345                    dictionary_tracker,
346                    write_options,
347                    dict_id,
348                    compression_context,
349                )?;
350            }
351            DataType::List(field) => {
352                let list = as_list_array(column);
353                self.encode_dictionaries(
354                    field,
355                    list.values(),
356                    encoded_dictionaries,
357                    dictionary_tracker,
358                    write_options,
359                    dict_id,
360                    compression_context,
361                )?;
362            }
363            DataType::LargeList(field) => {
364                let list = as_large_list_array(column);
365                self.encode_dictionaries(
366                    field,
367                    list.values(),
368                    encoded_dictionaries,
369                    dictionary_tracker,
370                    write_options,
371                    dict_id,
372                    compression_context,
373                )?;
374            }
375            DataType::ListView(field) => {
376                let list = column.as_list_view::<i32>();
377                self.encode_dictionaries(
378                    field,
379                    list.values(),
380                    encoded_dictionaries,
381                    dictionary_tracker,
382                    write_options,
383                    dict_id,
384                    compression_context,
385                )?;
386            }
387            DataType::LargeListView(field) => {
388                let list = column.as_list_view::<i64>();
389                self.encode_dictionaries(
390                    field,
391                    list.values(),
392                    encoded_dictionaries,
393                    dictionary_tracker,
394                    write_options,
395                    dict_id,
396                    compression_context,
397                )?;
398            }
399            DataType::FixedSizeList(field, _) => {
400                let list = column
401                    .as_any()
402                    .downcast_ref::<FixedSizeListArray>()
403                    .expect("Unable to downcast to fixed size list array");
404                self.encode_dictionaries(
405                    field,
406                    list.values(),
407                    encoded_dictionaries,
408                    dictionary_tracker,
409                    write_options,
410                    dict_id,
411                    compression_context,
412                )?;
413            }
414            DataType::Map(field, _) => {
415                let map_array = as_map_array(column);
416
417                let (keys, values) = match field.data_type() {
418                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
419                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
420                };
421
422                // keys
423                self.encode_dictionaries(
424                    keys,
425                    map_array.keys(),
426                    encoded_dictionaries,
427                    dictionary_tracker,
428                    write_options,
429                    dict_id,
430                    compression_context,
431                )?;
432
433                // values
434                self.encode_dictionaries(
435                    values,
436                    map_array.values(),
437                    encoded_dictionaries,
438                    dictionary_tracker,
439                    write_options,
440                    dict_id,
441                    compression_context,
442                )?;
443            }
444            DataType::Union(fields, _) => {
445                let union = as_union_array(column);
446                for (type_id, field) in fields.iter() {
447                    let column = union.child(type_id);
448                    self.encode_dictionaries(
449                        field,
450                        column,
451                        encoded_dictionaries,
452                        dictionary_tracker,
453                        write_options,
454                        dict_id,
455                        compression_context,
456                    )?;
457                }
458            }
459            _ => (),
460        }
461
462        Ok(())
463    }
464
465    #[allow(clippy::too_many_arguments)]
466    fn encode_dictionaries<I: Iterator<Item = i64>>(
467        &self,
468        field: &Field,
469        column: &ArrayRef,
470        encoded_dictionaries: &mut Vec<EncodedData>,
471        dictionary_tracker: &mut DictionaryTracker,
472        write_options: &IpcWriteOptions,
473        dict_id_seq: &mut I,
474        compression_context: &mut CompressionContext,
475    ) -> Result<(), ArrowError> {
476        match column.data_type() {
477            DataType::Dictionary(_key_type, _value_type) => {
478                let dict_data = column.to_data();
479                let dict_values = &dict_data.child_data()[0];
480
481                let values = make_array(dict_data.child_data()[0].clone());
482
483                self._encode_dictionaries(
484                    &values,
485                    encoded_dictionaries,
486                    dictionary_tracker,
487                    write_options,
488                    dict_id_seq,
489                    compression_context,
490                )?;
491
492                // It's important to only take the dict_id at this point, because the dict ID
493                // sequence is assigned depth-first, so we need to first encode children and have
494                // them take their assigned dict IDs before we take the dict ID for this field.
495                let dict_id = dict_id_seq.next().ok_or_else(|| {
496                    ArrowError::IpcError(format!(
497                        "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
498                        field.name(),
499                        field.data_type(),
500                        column.data_type()
501                    ))
502                })?;
503
504                match dictionary_tracker.insert_column(
505                    dict_id,
506                    column,
507                    write_options.dictionary_handling,
508                )? {
509                    DictionaryUpdate::None => {}
510                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
511                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
512                            dict_id,
513                            dict_values,
514                            write_options,
515                            false,
516                            compression_context,
517                        )?);
518                    }
519                    DictionaryUpdate::Delta(data) => {
520                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
521                            dict_id,
522                            &data,
523                            write_options,
524                            true,
525                            compression_context,
526                        )?);
527                    }
528                }
529            }
530            _ => self._encode_dictionaries(
531                column,
532                encoded_dictionaries,
533                dictionary_tracker,
534                write_options,
535                dict_id_seq,
536                compression_context,
537            )?,
538        }
539
540        Ok(())
541    }
542
543    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
544    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
545    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
546    pub fn encode(
547        &self,
548        batch: &RecordBatch,
549        dictionary_tracker: &mut DictionaryTracker,
550        write_options: &IpcWriteOptions,
551        compression_context: &mut CompressionContext,
552    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
553        let encoded_dictionaries = self.encode_all_dicts(
554            batch,
555            dictionary_tracker,
556            write_options,
557            compression_context,
558        )?;
559        let mut arrow_data = Vec::new();
560        let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
561            batch,
562            write_options,
563            compression_context,
564            &mut IpcBodySink::Write(&mut arrow_data),
565        )?;
566        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
567        Ok((
568            encoded_dictionaries,
569            EncodedData {
570                ipc_message,
571                arrow_data,
572            },
573        ))
574    }
575
576    /// Encode dictionary batches for all columns in `batch`.
577    fn encode_all_dicts(
578        &self,
579        batch: &RecordBatch,
580        dictionary_tracker: &mut DictionaryTracker,
581        write_options: &IpcWriteOptions,
582        compression_context: &mut CompressionContext,
583    ) -> Result<Vec<EncodedData>, ArrowError> {
584        let schema = batch.schema();
585        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
586        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
587        for (i, field) in schema.fields().iter().enumerate() {
588            self.encode_dictionaries(
589                field,
590                batch.column(i),
591                &mut encoded_dictionaries,
592                dictionary_tracker,
593                write_options,
594                &mut dict_id,
595                compression_context,
596            )?;
597        }
598        Ok(encoded_dictionaries)
599    }
600
601    /// Write dictionary batches and the record batch directly to `writer`, skipping the
602    /// intermediate body `Vec<u8>` allocations
603    /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer blocks.
604    fn write<W: Write>(
605        &self,
606        batch: &RecordBatch,
607        dictionary_tracker: &mut DictionaryTracker,
608        write_options: &IpcWriteOptions,
609        compression_context: &mut CompressionContext,
610        writer: &mut W,
611    ) -> Result<IpcWriteMetadata, ArrowError> {
612        let encoded_dictionaries = self.encode_all_dicts(
613            batch,
614            dictionary_tracker,
615            write_options,
616            compression_context,
617        )?;
618
619        let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
620        for dict in encoded_dictionaries {
621            dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
622        }
623
624        let capacity = batch
625            .columns()
626            .iter()
627            .map(|a| estimate_encoded_buffer_count(a.data_type()))
628            .sum();
629        let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
630        let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
631            batch,
632            write_options,
633            compression_context,
634            &mut IpcBodySink::Collect(&mut encoded_buffers),
635        )?;
636
637        let alignment = write_options.alignment;
638        let a = usize::from(alignment - 1);
639        let prefix_size = if write_options.write_legacy_ipc_format {
640            4
641        } else {
642            8
643        };
644        let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
645        write_continuation(
646            &mut *writer,
647            write_options,
648            (aligned_size - prefix_size) as i32,
649        )?;
650        writer.write_all(&ipc_message)?;
651        writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
652        for enc in &encoded_buffers {
653            writer.write_all(enc.as_slice())?;
654            writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
655        }
656        writer.write_all(&PADDING[..tail_pad])?;
657
658        Ok(IpcWriteMetadata {
659            dictionary_block_sizes,
660            padded_header_len: aligned_size,
661            body_len,
662        })
663    }
664
665    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
666    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
667    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
668    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
669    pub fn encoded_batch(
670        &self,
671        batch: &RecordBatch,
672        dictionary_tracker: &mut DictionaryTracker,
673        write_options: &IpcWriteOptions,
674    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
675        self.encode(
676            batch,
677            dictionary_tracker,
678            write_options,
679            &mut Default::default(),
680        )
681    }
682
683    /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the
684    /// serialised buffer data.
685    ///
686    /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header bytes, the
687    /// total body length including trailing padding, and the trailing alignment padding byte count.
688    fn record_batch_to_bytes(
689        &self,
690        batch: &RecordBatch,
691        write_options: &IpcWriteOptions,
692        compression_context: &mut CompressionContext,
693        sink: &mut IpcBodySink<'_>,
694    ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
695        let mut fbb = FlatBufferBuilder::new();
696
697        let batch_compression_type = write_options.batch_compression_type;
698
699        let compression = batch_compression_type.map(|batch_compression_type| {
700            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
701            c.add_method(crate::BodyCompressionMethod::BUFFER);
702            c.add_codec(batch_compression_type);
703            c.finish()
704        });
705
706        let compression_codec: Option<CompressionCodec> =
707            batch_compression_type.map(TryInto::try_into).transpose()?;
708
709        let alignment = write_options.alignment;
710        let mut variadic_buffer_counts = vec![];
711        let mut meta = IpcMetadataBuilder::default();
712        let mut offset = 0i64;
713
714        for array in batch.columns() {
715            let array_data = array.to_data();
716            offset = write_array_data(
717                &array_data,
718                &mut meta,
719                sink,
720                offset,
721                compression_codec,
722                compression_context,
723                write_options,
724            )?;
725            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
726        }
727
728        let tail_pad = pad_to_alignment(alignment, offset as usize);
729        let body_len = offset as usize + tail_pad;
730
731        let buffers = fbb.create_vector(&meta.buffers);
732        let nodes = fbb.create_vector(&meta.nodes);
733        let variadic_buffer = if variadic_buffer_counts.is_empty() {
734            None
735        } else {
736            Some(fbb.create_vector(&variadic_buffer_counts))
737        };
738
739        let root = {
740            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
741            batch_builder.add_length(batch.num_rows() as i64);
742            batch_builder.add_nodes(nodes);
743            batch_builder.add_buffers(buffers);
744            if let Some(c) = compression {
745                batch_builder.add_compression(c);
746            }
747            if let Some(v) = variadic_buffer {
748                batch_builder.add_variadicBufferCounts(v);
749            }
750            batch_builder.finish().as_union_value()
751        };
752        // create an crate::Message
753        let mut message = crate::MessageBuilder::new(&mut fbb);
754        message.add_version(write_options.metadata_version);
755        message.add_header_type(crate::MessageHeader::RecordBatch);
756        message.add_bodyLength(body_len as i64);
757        message.add_header(root);
758        let root = message.finish();
759        fbb.finish(root, None);
760
761        Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
762    }
763
764    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
765    /// other for the data
766    fn dictionary_batch_to_bytes(
767        &self,
768        dict_id: i64,
769        array_data: &ArrayData,
770        write_options: &IpcWriteOptions,
771        is_delta: bool,
772        compression_context: &mut CompressionContext,
773    ) -> Result<EncodedData, ArrowError> {
774        let mut fbb = FlatBufferBuilder::new();
775
776        let mut arrow_data: Vec<u8> = vec![];
777
778        // get the type of compression
779        let batch_compression_type = write_options.batch_compression_type;
780
781        let compression = batch_compression_type.map(|batch_compression_type| {
782            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
783            c.add_method(crate::BodyCompressionMethod::BUFFER);
784            c.add_codec(batch_compression_type);
785            c.finish()
786        });
787
788        let compression_codec: Option<CompressionCodec> = batch_compression_type
789            .map(|batch_compression_type| batch_compression_type.try_into())
790            .transpose()?;
791
792        let alignment = write_options.alignment;
793        let mut meta = IpcMetadataBuilder::default();
794        let mut sink = IpcBodySink::Write(&mut arrow_data);
795        let offset = write_array_data(
796            array_data,
797            &mut meta,
798            &mut sink,
799            0,
800            compression_codec,
801            compression_context,
802            write_options,
803        )?;
804
805        let mut variadic_buffer_counts = vec![];
806        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
807
808        // pad the tail of body data
809        let tail_pad = pad_to_alignment(alignment, offset as usize);
810        let body_len = offset as usize + tail_pad;
811        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
812
813        // write data
814        let buffers = fbb.create_vector(&meta.buffers);
815        let nodes = fbb.create_vector(&meta.nodes);
816        let variadic_buffer = if variadic_buffer_counts.is_empty() {
817            None
818        } else {
819            Some(fbb.create_vector(&variadic_buffer_counts))
820        };
821
822        let root = {
823            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
824            batch_builder.add_length(array_data.len() as i64);
825            batch_builder.add_nodes(nodes);
826            batch_builder.add_buffers(buffers);
827            if let Some(c) = compression {
828                batch_builder.add_compression(c);
829            }
830            if let Some(v) = variadic_buffer {
831                batch_builder.add_variadicBufferCounts(v);
832            }
833            batch_builder.finish()
834        };
835
836        let root = {
837            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
838            batch_builder.add_id(dict_id);
839            batch_builder.add_data(root);
840            batch_builder.add_isDelta(is_delta);
841            batch_builder.finish().as_union_value()
842        };
843
844        let root = {
845            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
846            message_builder.add_version(write_options.metadata_version);
847            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
848            message_builder.add_bodyLength(body_len as i64);
849            message_builder.add_header(root);
850            message_builder.finish()
851        };
852
853        fbb.finish(root, None);
854        let finished_data = fbb.finished_data();
855
856        Ok(EncodedData {
857            ipc_message: finished_data.to_vec(),
858            arrow_data,
859        })
860    }
861}
862
863fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
864    match array.data_type() {
865        DataType::BinaryView | DataType::Utf8View => {
866            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
867            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
868            counts.push(array.buffers().len() as i64 - 1);
869        }
870        DataType::Dictionary(_, _) => {
871            // Do nothing
872            // Dictionary types are handled in `encode_dictionaries`.
873        }
874        _ => {
875            for child in array.child_data() {
876                append_variadic_buffer_counts(counts, child)
877            }
878        }
879    }
880}
881
882pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
883    match arr.data_type() {
884        DataType::RunEndEncoded(k, _) => match k.data_type() {
885            DataType::Int16 => {
886                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
887            }
888            DataType::Int32 => {
889                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
890            }
891            DataType::Int64 => {
892                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
893            }
894            d => unreachable!("Unexpected data type {d}"),
895        },
896        d => Err(ArrowError::InvalidArgumentError(format!(
897            "The given array is not a run array. Data type of given array: {d}"
898        ))),
899    }
900}
901
902// Returns a `RunArray` with zero offset and length matching the last value
903// in run_ends array.
904fn into_zero_offset_run_array<R: RunEndIndexType>(
905    run_array: RunArray<R>,
906) -> Result<RunArray<R>, ArrowError> {
907    let run_ends = run_array.run_ends();
908    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
909        return Ok(run_array);
910    }
911
912    // The physical index of original run_ends array from which the `ArrayData`is sliced.
913    let start_physical_index = run_ends.get_start_physical_index();
914
915    // The physical index of original run_ends array until which the `ArrayData`is sliced.
916    let end_physical_index = run_ends.get_end_physical_index();
917
918    let physical_length = end_physical_index - start_physical_index + 1;
919
920    // build new run_ends array by subtracting offset from run ends.
921    let offset = R::Native::usize_as(run_ends.offset());
922    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
923    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
924        builder.append(run_end_value.sub_wrapping(offset));
925    }
926    builder.append(R::Native::from_usize(run_array.len()).unwrap());
927    let new_run_ends = unsafe {
928        // Safety:
929        // The function builds a valid run_ends array and hence need not be validated.
930        ArrayDataBuilder::new(R::DATA_TYPE)
931            .len(physical_length)
932            .add_buffer(builder.finish())
933            .build_unchecked()
934    };
935
936    // build new values by slicing physical indices.
937    let new_values = run_array
938        .values()
939        .slice(start_physical_index, physical_length)
940        .into_data();
941
942    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
943        .len(run_array.len())
944        .add_child_data(new_run_ends)
945        .add_child_data(new_values);
946    let array_data = unsafe {
947        // Safety:
948        //  This function builds a valid run array and hence can skip validation.
949        builder.build_unchecked()
950    };
951    Ok(array_data.into())
952}
953
954/// Controls how dictionaries are handled in Arrow IPC messages
955#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
956pub enum DictionaryHandling {
957    /// Send the entire dictionary every time it is encountered (default)
958    #[default]
959    Resend,
960    /// Send only new dictionary values since the last batch (delta encoding)
961    ///
962    /// When a dictionary is first encountered, the entire dictionary is sent.
963    /// For subsequent batches, only values that are new (not previously sent)
964    /// are transmitted with the `isDelta` flag set to true.
965    Delta,
966}
967
968/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
969#[derive(Debug, Clone)]
970pub enum DictionaryUpdate {
971    /// No dictionary was written, the dictionary was identical to what was already
972    /// in the tracker.
973    None,
974    /// No dictionary was present in the tracker
975    New,
976    /// Dictionary was replaced with the new data
977    Replaced,
978    /// Dictionary was updated, ArrayData is the delta between old and new
979    Delta(ArrayData),
980}
981
982/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
983/// multiple times.
984///
985/// Can optionally error if an update to an existing dictionary is attempted, which
986/// isn't allowed in the `FileWriter`.
987#[derive(Debug)]
988pub struct DictionaryTracker {
989    // NOTE: When adding fields, update the clear() method accordingly.
990    written: HashMap<i64, ArrayData>,
991    dict_ids: Vec<i64>,
992    error_on_replacement: bool,
993}
994
995impl DictionaryTracker {
996    /// Create a new [`DictionaryTracker`].
997    ///
998    /// If `error_on_replacement`
999    /// is true, an error will be generated if an update to an
1000    /// existing dictionary is attempted.
1001    pub fn new(error_on_replacement: bool) -> Self {
1002        #[allow(deprecated)]
1003        Self {
1004            written: HashMap::new(),
1005            dict_ids: Vec::new(),
1006            error_on_replacement,
1007        }
1008    }
1009
1010    /// Record and return the next dictionary ID.
1011    pub fn next_dict_id(&mut self) -> i64 {
1012        let next = self
1013            .dict_ids
1014            .last()
1015            .copied()
1016            .map(|i| i + 1)
1017            .unwrap_or_default();
1018
1019        self.dict_ids.push(next);
1020        next
1021    }
1022
1023    /// Return the sequence of dictionary IDs in the order they should be observed while
1024    /// traversing the schema
1025    pub fn dict_id(&mut self) -> &[i64] {
1026        &self.dict_ids
1027    }
1028
1029    /// Keep track of the dictionary with the given ID and values. Behavior:
1030    ///
1031    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
1032    ///   that the dictionary was not actually inserted (because it's already been seen).
1033    /// * If this ID has been written already but with different data, and this tracker is
1034    ///   configured to return an error, return an error.
1035    /// * If the tracker has not been configured to error on replacement or this dictionary
1036    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
1037    ///   inserted.
1038    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1039    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1040        let dict_data = column.to_data();
1041        let dict_values = &dict_data.child_data()[0];
1042
1043        // If a dictionary with this id was already emitted, check if it was the same.
1044        if let Some(last) = self.written.get(&dict_id) {
1045            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1046                // Same dictionary values => no need to emit it again
1047                return Ok(false);
1048            }
1049            if self.error_on_replacement {
1050                // If error on replacement perform a logical comparison
1051                if last.child_data()[0] == *dict_values {
1052                    // Same dictionary values => no need to emit it again
1053                    return Ok(false);
1054                }
1055                return Err(ArrowError::InvalidArgumentError(
1056                    "Dictionary replacement detected when writing IPC file format. \
1057                     Arrow IPC files only support a single dictionary for a given field \
1058                     across all batches."
1059                        .to_string(),
1060                ));
1061            }
1062        }
1063
1064        self.written.insert(dict_id, dict_data);
1065        Ok(true)
1066    }
1067
1068    /// Keep track of the dictionary with the given ID and values. The return
1069    /// value indicates what, if any, update to the internal map took place
1070    /// and how it should be interpreted based on the `dict_handling` parameter.
1071    ///
1072    /// # Returns
1073    ///
1074    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
1075    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
1076    ///   with completely different data, or if the data is a delta of the existing,
1077    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
1078    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
1079    ///   the new data is a delta of the old and the `dict_handling` is set to
1080    ///   `DictionaryHandling::Delta`
1081    /// * `Err(e)` - If the dictionary was previously written with different data,
1082    ///   and `error_on_replacement` is set to `true`.
1083    pub fn insert_column(
1084        &mut self,
1085        dict_id: i64,
1086        column: &ArrayRef,
1087        dict_handling: DictionaryHandling,
1088    ) -> Result<DictionaryUpdate, ArrowError> {
1089        let new_data = column.to_data();
1090        let new_values = &new_data.child_data()[0];
1091
1092        // If there is no existing dictionary with this ID, we always insert
1093        let Some(old) = self.written.get(&dict_id) else {
1094            self.written.insert(dict_id, new_data);
1095            return Ok(DictionaryUpdate::New);
1096        };
1097
1098        // Fast path - If the array data points to the same buffer as the
1099        // existing then they're the same.
1100        let old_values = &old.child_data()[0];
1101        if ArrayData::ptr_eq(old_values, new_values) {
1102            return Ok(DictionaryUpdate::None);
1103        }
1104
1105        // Slow path - Compare the dictionaries value by value
1106        let comparison = compare_dictionaries(old_values, new_values);
1107        if matches!(comparison, DictionaryComparison::Equal) {
1108            return Ok(DictionaryUpdate::None);
1109        }
1110
1111        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1112                 Arrow IPC files only support a single dictionary for a given field \
1113                 across all batches.";
1114
1115        match comparison {
1116            DictionaryComparison::NotEqual => {
1117                if self.error_on_replacement {
1118                    return Err(ArrowError::InvalidArgumentError(
1119                        REPLACEMENT_ERROR.to_string(),
1120                    ));
1121                }
1122
1123                self.written.insert(dict_id, new_data);
1124                Ok(DictionaryUpdate::Replaced)
1125            }
1126            DictionaryComparison::Delta => match dict_handling {
1127                DictionaryHandling::Resend => {
1128                    if self.error_on_replacement {
1129                        return Err(ArrowError::InvalidArgumentError(
1130                            REPLACEMENT_ERROR.to_string(),
1131                        ));
1132                    }
1133
1134                    self.written.insert(dict_id, new_data);
1135                    Ok(DictionaryUpdate::Replaced)
1136                }
1137                DictionaryHandling::Delta => {
1138                    let delta =
1139                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
1140                    self.written.insert(dict_id, new_data);
1141                    Ok(DictionaryUpdate::Delta(delta))
1142                }
1143            },
1144            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1145        }
1146    }
1147
1148    /// Clears the state of the dictionary tracker.
1149    ///
1150    /// This allows the dictionary tracker to be reused for a new IPC stream while avoiding the
1151    /// allocation cost of creating a new instance. This method should not be called if
1152    /// the dictionary tracker will be used to continue writing to an existing IPC stream.
1153    pub fn clear(&mut self) {
1154        self.dict_ids.clear();
1155        self.written.clear();
1156    }
1157}
1158
1159/// Describes how two dictionary arrays compare to each other.
1160#[derive(Debug, Clone)]
1161enum DictionaryComparison {
1162    /// Neither a delta, nor an exact match
1163    NotEqual,
1164    /// Exact element-wise match
1165    Equal,
1166    /// The two arrays are dictionary deltas of each other, meaning the first
1167    /// is a prefix of the second.
1168    Delta,
1169}
1170
1171// Compares two dictionaries and returns a [`DictionaryComparison`].
1172fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1173    // Check for exact match
1174    let existing_len = old.len();
1175    let new_len = new.len();
1176    if existing_len == new_len {
1177        if *old == *new {
1178            return DictionaryComparison::Equal;
1179        } else {
1180            return DictionaryComparison::NotEqual;
1181        }
1182    }
1183
1184    // Can't be a delta if the new is shorter than the existing
1185    if new_len < existing_len {
1186        return DictionaryComparison::NotEqual;
1187    }
1188
1189    // Check for delta
1190    if new.slice(0, existing_len) == *old {
1191        return DictionaryComparison::Delta;
1192    }
1193
1194    DictionaryComparison::NotEqual
1195}
1196
1197/// Arrow File Writer
1198///
1199/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1200///
1201/// # See Also
1202///
1203/// * [`StreamWriter`] for writing IPC Streams
1204///
1205/// # Example
1206/// ```
1207/// # use arrow_array::record_batch;
1208/// # use arrow_ipc::writer::FileWriter;
1209/// # let mut file = vec![]; // mimic a file for the example
1210/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1211/// // create a new writer, the schema must be known in advance
1212/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1213/// // write each batch to the underlying writer
1214/// writer.write(&batch).unwrap();
1215/// // When all batches are written, call finish to flush all buffers
1216/// writer.finish().unwrap();
1217/// ```
1218/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1219pub struct FileWriter<W> {
1220    /// The object to write to
1221    writer: W,
1222    /// IPC write options
1223    write_options: IpcWriteOptions,
1224    /// A reference to the schema, used in validating record batches
1225    schema: SchemaRef,
1226    /// The number of bytes between each block of bytes, as an offset for random access
1227    block_offsets: usize,
1228    /// Dictionary blocks that will be written as part of the IPC footer
1229    dictionary_blocks: Vec<crate::Block>,
1230    /// Record blocks that will be written as part of the IPC footer
1231    record_blocks: Vec<crate::Block>,
1232    /// Whether the writer footer has been written, and the writer is finished
1233    finished: bool,
1234    /// Keeps track of dictionaries that have been written
1235    dictionary_tracker: DictionaryTracker,
1236    /// User level customized metadata
1237    custom_metadata: HashMap<String, String>,
1238
1239    data_gen: IpcDataGenerator,
1240
1241    compression_context: CompressionContext,
1242}
1243
1244impl<W: Write> FileWriter<BufWriter<W>> {
1245    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1246    ///
1247    /// See [`FileWriter::try_new`] for an unbuffered version.
1248    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1249        Self::try_new(BufWriter::new(writer), schema)
1250    }
1251}
1252
1253impl<W: Write> FileWriter<W> {
1254    /// Try to create a new writer, with the schema written as part of the header
1255    ///
1256    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1257    ///
1258    /// # Errors
1259    ///
1260    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1261    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1262        let write_options = IpcWriteOptions::default();
1263        Self::try_new_with_options(writer, schema, write_options)
1264    }
1265
1266    /// Try to create a new writer with IpcWriteOptions
1267    ///
1268    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1269    ///
1270    /// # Errors
1271    ///
1272    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1273    pub fn try_new_with_options(
1274        mut writer: W,
1275        schema: &Schema,
1276        write_options: IpcWriteOptions,
1277    ) -> Result<Self, ArrowError> {
1278        let data_gen = IpcDataGenerator::default();
1279        // write magic to header aligned on alignment boundary
1280        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1281        let header_size = super::ARROW_MAGIC.len() + pad_len;
1282        writer.write_all(&super::ARROW_MAGIC)?;
1283        writer.write_all(&PADDING[..pad_len])?;
1284        // write the schema, set the written bytes to the schema + header
1285        let mut dictionary_tracker = DictionaryTracker::new(true);
1286        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1287            schema,
1288            &mut dictionary_tracker,
1289            &write_options,
1290        );
1291        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1292        Ok(Self {
1293            writer,
1294            write_options,
1295            schema: Arc::new(schema.clone()),
1296            block_offsets: meta + data + header_size,
1297            dictionary_blocks: vec![],
1298            record_blocks: vec![],
1299            finished: false,
1300            dictionary_tracker,
1301            custom_metadata: HashMap::new(),
1302            data_gen,
1303            compression_context: CompressionContext::default(),
1304        })
1305    }
1306
1307    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1308    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1309        self.custom_metadata.insert(key.into(), value.into());
1310    }
1311
1312    /// Write a record batch to the file
1313    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1314        if self.finished {
1315            return Err(ArrowError::IpcError(
1316                "Cannot write record batch to file writer as it is closed".to_string(),
1317            ));
1318        }
1319
1320        let meta = self.data_gen.write(
1321            batch,
1322            &mut self.dictionary_tracker,
1323            &self.write_options,
1324            &mut self.compression_context,
1325            &mut self.writer,
1326        )?;
1327
1328        for (header_len, body_len) in meta.dictionary_block_sizes {
1329            let block = crate::Block::new(
1330                self.block_offsets as i64,
1331                header_len as i32,
1332                body_len as i64,
1333            );
1334            self.dictionary_blocks.push(block);
1335            self.block_offsets += header_len + body_len;
1336        }
1337
1338        // add a record block for the footer
1339        let block = crate::Block::new(
1340            self.block_offsets as i64,
1341            meta.padded_header_len as i32,
1342            meta.body_len as i64,
1343        );
1344        self.record_blocks.push(block);
1345        self.block_offsets += meta.padded_header_len + meta.body_len;
1346        Ok(())
1347    }
1348
1349    /// Write footer and closing tag, then mark the writer as done
1350    pub fn finish(&mut self) -> Result<(), ArrowError> {
1351        if self.finished {
1352            return Err(ArrowError::IpcError(
1353                "Cannot write footer to file writer as it is closed".to_string(),
1354            ));
1355        }
1356
1357        // write EOS
1358        write_continuation(&mut self.writer, &self.write_options, 0)?;
1359
1360        let mut fbb = FlatBufferBuilder::new();
1361        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1362        let record_batches = fbb.create_vector(&self.record_blocks);
1363
1364        // dictionaries are already written, so we can reset dictionary tracker to reuse for schema
1365        self.dictionary_tracker.clear();
1366        let schema = IpcSchemaEncoder::new()
1367            .with_dictionary_tracker(&mut self.dictionary_tracker)
1368            .schema_to_fb_offset(&mut fbb, &self.schema);
1369        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1370            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1371
1372        let root = {
1373            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1374            footer_builder.add_version(self.write_options.metadata_version);
1375            footer_builder.add_schema(schema);
1376            footer_builder.add_dictionaries(dictionaries);
1377            footer_builder.add_recordBatches(record_batches);
1378            if let Some(fb_custom_metadata) = fb_custom_metadata {
1379                footer_builder.add_custom_metadata(fb_custom_metadata);
1380            }
1381            footer_builder.finish()
1382        };
1383        fbb.finish(root, None);
1384        let footer_data = fbb.finished_data();
1385        self.writer.write_all(footer_data)?;
1386        self.writer
1387            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1388        self.writer.write_all(&super::ARROW_MAGIC)?;
1389        self.writer.flush()?;
1390        self.finished = true;
1391
1392        Ok(())
1393    }
1394
1395    /// Returns the arrow [`SchemaRef`] for this arrow file.
1396    pub fn schema(&self) -> &SchemaRef {
1397        &self.schema
1398    }
1399
1400    /// Gets a reference to the underlying writer.
1401    pub fn get_ref(&self) -> &W {
1402        &self.writer
1403    }
1404
1405    /// Gets a mutable reference to the underlying writer.
1406    ///
1407    /// It is inadvisable to directly write to the underlying writer.
1408    pub fn get_mut(&mut self) -> &mut W {
1409        &mut self.writer
1410    }
1411
1412    /// Flush the underlying writer.
1413    ///
1414    /// Both the BufWriter and the underlying writer are flushed.
1415    pub fn flush(&mut self) -> Result<(), ArrowError> {
1416        self.writer.flush()?;
1417        Ok(())
1418    }
1419
1420    /// Unwraps the underlying writer.
1421    ///
1422    /// The writer is flushed and the FileWriter is finished before returning.
1423    ///
1424    /// # Errors
1425    ///
1426    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1427    /// or while flushing the writer.
1428    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1429        if !self.finished {
1430            // `finish` flushes the writer.
1431            self.finish()?;
1432        }
1433        Ok(self.writer)
1434    }
1435}
1436
1437impl<W: Write> RecordBatchWriter for FileWriter<W> {
1438    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1439        self.write(batch)
1440    }
1441
1442    fn close(mut self) -> Result<(), ArrowError> {
1443        self.finish()
1444    }
1445}
1446
1447/// Arrow Stream Writer
1448///
1449/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1450///
1451/// # See Also
1452///
1453/// * [`FileWriter`] for writing IPC Files
1454///
1455/// # Example - Basic usage
1456/// ```
1457/// # use arrow_array::record_batch;
1458/// # use arrow_ipc::writer::StreamWriter;
1459/// # let mut stream = vec![]; // mimic a stream for the example
1460/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1461/// // create a new writer, the schema must be known in advance
1462/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1463/// // write each batch to the underlying stream
1464/// writer.write(&batch).unwrap();
1465/// // When all batches are written, call finish to flush all buffers
1466/// writer.finish().unwrap();
1467/// ```
1468/// # Example - Efficient delta dictionaries
1469/// ```
1470/// # use arrow_array::record_batch;
1471/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1472/// # use arrow_ipc::writer::DictionaryHandling;
1473/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1474/// # use arrow_array::{
1475/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1476/// #    RecordBatch, StringArray,
1477/// # };
1478/// # use std::sync::Arc;
1479///
1480/// let schema = Arc::new(Schema::new(vec![Field::new(
1481///    "col1",
1482///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1483///    true,
1484/// )]));
1485///
1486/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1487///
1488/// // `finish_preserve_values` will keep the dictionary values along with their
1489/// // key assignments so that they can be re-used in the next batch.
1490/// builder.append("a").unwrap();
1491/// builder.append("b").unwrap();
1492/// let array1 = builder.finish_preserve_values();
1493/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1494///
1495/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1496/// // and 'd' will take the next available key.
1497/// builder.append("a").unwrap();
1498/// builder.append("d").unwrap();
1499/// let array2 = builder.finish_preserve_values();
1500/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1501///
1502/// let mut stream = vec![];
1503/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1504/// // enable delta dictionaries in the writer
1505/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1506/// let mut writer = StreamWriter::try_new_with_options(&mut stream, &schema, options).unwrap();
1507///
1508/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1509/// // prior to the record batch.
1510/// writer.write(&batch1).unwrap();
1511/// // With the second batch only a delta dictionary with 'd' will be written
1512/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1513/// // Without it, 'a' and 'd' in this batch would have different keys than the
1514/// // first batch and so we'd have to send a replacement dictionary with new keys
1515/// // for both.
1516/// writer.write(&batch2).unwrap();
1517/// writer.finish().unwrap();
1518/// ```
1519/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1520pub struct StreamWriter<W> {
1521    /// The object to write to
1522    writer: W,
1523    /// IPC write options
1524    write_options: IpcWriteOptions,
1525    /// Whether the writer footer has been written, and the writer is finished
1526    finished: bool,
1527    /// Keeps track of dictionaries that have been written
1528    dictionary_tracker: DictionaryTracker,
1529
1530    data_gen: IpcDataGenerator,
1531
1532    compression_context: CompressionContext,
1533}
1534
1535impl<W: Write> StreamWriter<BufWriter<W>> {
1536    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1537    ///
1538    /// See [`StreamWriter::try_new`] for an unbuffered version.
1539    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1540        Self::try_new(BufWriter::new(writer), schema)
1541    }
1542}
1543
1544impl<W: Write> StreamWriter<W> {
1545    /// Try to create a new writer, with the schema written as part of the header.
1546    ///
1547    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1548    ///
1549    /// # Errors
1550    ///
1551    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1552    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1553        let write_options = IpcWriteOptions::default();
1554        Self::try_new_with_options(writer, schema, write_options)
1555    }
1556
1557    /// Try to create a new writer with [`IpcWriteOptions`].
1558    ///
1559    /// # Errors
1560    ///
1561    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1562    pub fn try_new_with_options(
1563        mut writer: W,
1564        schema: &Schema,
1565        write_options: IpcWriteOptions,
1566    ) -> Result<Self, ArrowError> {
1567        let data_gen = IpcDataGenerator::default();
1568        let mut dictionary_tracker = DictionaryTracker::new(false);
1569
1570        // write the schema, set the written bytes to the schema
1571        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1572            schema,
1573            &mut dictionary_tracker,
1574            &write_options,
1575        );
1576        write_message(&mut writer, encoded_message, &write_options)?;
1577        Ok(Self {
1578            writer,
1579            write_options,
1580            finished: false,
1581            dictionary_tracker,
1582            data_gen,
1583            compression_context: CompressionContext::default(),
1584        })
1585    }
1586
1587    /// Write a record batch to the stream
1588    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1589        if self.finished {
1590            return Err(ArrowError::IpcError(
1591                "Cannot write record batch to stream writer as it is closed".to_string(),
1592            ));
1593        }
1594
1595        self.data_gen.write(
1596            batch,
1597            &mut self.dictionary_tracker,
1598            &self.write_options,
1599            &mut self.compression_context,
1600            &mut self.writer,
1601        )?;
1602        Ok(())
1603    }
1604
1605    /// Write continuation bytes, and mark the stream as done
1606    pub fn finish(&mut self) -> Result<(), ArrowError> {
1607        if self.finished {
1608            return Err(ArrowError::IpcError(
1609                "Cannot write footer to stream writer as it is closed".to_string(),
1610            ));
1611        }
1612
1613        write_continuation(&mut self.writer, &self.write_options, 0)?;
1614        self.writer.flush()?;
1615
1616        self.finished = true;
1617
1618        Ok(())
1619    }
1620
1621    /// Gets a reference to the underlying writer.
1622    pub fn get_ref(&self) -> &W {
1623        &self.writer
1624    }
1625
1626    /// Gets a mutable reference to the underlying writer.
1627    ///
1628    /// It is inadvisable to directly write to the underlying writer.
1629    pub fn get_mut(&mut self) -> &mut W {
1630        &mut self.writer
1631    }
1632
1633    /// Flush the underlying writer.
1634    ///
1635    /// Both the BufWriter and the underlying writer are flushed.
1636    pub fn flush(&mut self) -> Result<(), ArrowError> {
1637        self.writer.flush()?;
1638        Ok(())
1639    }
1640
1641    /// Unwraps the the underlying writer.
1642    ///
1643    /// The writer is flushed and the StreamWriter is finished before returning.
1644    ///
1645    /// # Errors
1646    ///
1647    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1648    /// or while flushing the writer.
1649    ///
1650    /// # Example
1651    ///
1652    /// ```
1653    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1654    /// # use arrow_ipc::MetadataVersion;
1655    /// # use arrow_schema::{ArrowError, Schema};
1656    /// # fn main() -> Result<(), ArrowError> {
1657    /// // The result we expect from an empty schema
1658    /// let expected = vec![
1659    ///     255, 255, 255, 255,  48,   0,   0,   0,
1660    ///      16,   0,   0,   0,   0,   0,  10,   0,
1661    ///      12,   0,  10,   0,   9,   0,   4,   0,
1662    ///      10,   0,   0,   0,  16,   0,   0,   0,
1663    ///       0,   1,   4,   0,   8,   0,   8,   0,
1664    ///       0,   0,   4,   0,   8,   0,   0,   0,
1665    ///       4,   0,   0,   0,   0,   0,   0,   0,
1666    ///     255, 255, 255, 255,   0,   0,   0,   0
1667    /// ];
1668    ///
1669    /// let schema = Schema::empty();
1670    /// let buffer: Vec<u8> = Vec::new();
1671    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1672    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1673    ///
1674    /// assert_eq!(stream_writer.into_inner()?, expected);
1675    /// # Ok(())
1676    /// # }
1677    /// ```
1678    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1679        if !self.finished {
1680            // `finish` flushes.
1681            self.finish()?;
1682        }
1683        Ok(self.writer)
1684    }
1685}
1686
1687impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1688    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1689        self.write(batch)
1690    }
1691
1692    fn close(mut self) -> Result<(), ArrowError> {
1693        self.finish()
1694    }
1695}
1696
1697/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1698pub struct EncodedData {
1699    /// An encoded crate::Message
1700    pub ipc_message: Vec<u8>,
1701    /// Arrow buffers to be written, should be an empty vec for schema messages
1702    pub arrow_data: Vec<u8>,
1703}
1704/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1705pub fn write_message<W: Write>(
1706    mut writer: W,
1707    encoded: EncodedData,
1708    write_options: &IpcWriteOptions,
1709) -> Result<(usize, usize), ArrowError> {
1710    let arrow_data_len = encoded.arrow_data.len();
1711    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1712        return Err(ArrowError::MemoryError(
1713            "Arrow data not aligned".to_string(),
1714        ));
1715    }
1716
1717    let a = usize::from(write_options.alignment - 1);
1718    let buffer = encoded.ipc_message;
1719    let flatbuf_size = buffer.len();
1720    let prefix_size = if write_options.write_legacy_ipc_format {
1721        4
1722    } else {
1723        8
1724    };
1725    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1726    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1727
1728    write_continuation(
1729        &mut writer,
1730        write_options,
1731        (aligned_size - prefix_size) as i32,
1732    )?;
1733
1734    // write the flatbuf
1735    if flatbuf_size > 0 {
1736        writer.write_all(&buffer)?;
1737    }
1738    // write padding
1739    writer.write_all(&PADDING[..padding_bytes])?;
1740
1741    // write arrow data
1742    let body_len = if arrow_data_len > 0 {
1743        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1744    } else {
1745        0
1746    };
1747
1748    Ok((aligned_size, body_len))
1749}
1750
1751fn write_body_buffers<W: Write>(
1752    mut writer: W,
1753    data: &[u8],
1754    alignment: u8,
1755) -> Result<usize, ArrowError> {
1756    let len = data.len();
1757    let pad_len = pad_to_alignment(alignment, len);
1758    let total_len = len + pad_len;
1759
1760    // write body buffer
1761    writer.write_all(data)?;
1762    if pad_len > 0 {
1763        writer.write_all(&PADDING[..pad_len])?;
1764    }
1765
1766    Ok(total_len)
1767}
1768
1769/// Write a record batch to the writer, writing the message size before the message
1770/// if the record batch is being written to a stream
1771fn write_continuation<W: Write>(
1772    mut writer: W,
1773    write_options: &IpcWriteOptions,
1774    total_len: i32,
1775) -> Result<usize, ArrowError> {
1776    let mut written = 8;
1777
1778    // the version of the writer determines whether continuation markers should be added
1779    match write_options.metadata_version {
1780        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1781            unreachable!("Options with the metadata version cannot be created")
1782        }
1783        crate::MetadataVersion::V4 => {
1784            if !write_options.write_legacy_ipc_format {
1785                // v0.15.0 format
1786                writer.write_all(&CONTINUATION_MARKER)?;
1787                written = 4;
1788            }
1789            writer.write_all(&total_len.to_le_bytes()[..])?;
1790        }
1791        crate::MetadataVersion::V5 => {
1792            // write continuation marker and message length
1793            writer.write_all(&CONTINUATION_MARKER)?;
1794            writer.write_all(&total_len.to_le_bytes()[..])?;
1795        }
1796        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1797    };
1798
1799    Ok(written)
1800}
1801
1802/// In V4, null types have no validity bitmap
1803/// In V5 and later, null and union types have no validity bitmap
1804/// Run end encoded type has no validity bitmap.
1805fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1806    if write_options.metadata_version < crate::MetadataVersion::V5 {
1807        !matches!(data_type, DataType::Null)
1808    } else {
1809        !matches!(
1810            data_type,
1811            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1812        )
1813    }
1814}
1815
1816/// Whether to truncate the buffer
1817#[inline]
1818fn buffer_need_truncate(
1819    array_offset: usize,
1820    buffer: &Buffer,
1821    spec: &BufferSpec,
1822    min_length: usize,
1823) -> bool {
1824    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1825}
1826
1827/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1828#[inline]
1829fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1830    match spec {
1831        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1832        _ => 0,
1833    }
1834}
1835
1836/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1837/// original start offset and length for use in slicing child data.
1838fn reencode_offsets<O: OffsetSizeTrait>(
1839    offsets: &Buffer,
1840    data: &ArrayData,
1841) -> (Buffer, usize, usize) {
1842    let offsets_slice: &[O] = offsets.typed_data::<O>();
1843    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1844
1845    let start_offset = offset_slice.first().unwrap();
1846    let end_offset = offset_slice.last().unwrap();
1847
1848    let offsets = match start_offset.as_usize() {
1849        0 => {
1850            let size = size_of::<O>();
1851            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1852        }
1853        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1854    };
1855
1856    let start_offset = start_offset.as_usize();
1857    let end_offset = end_offset.as_usize();
1858
1859    (offsets, start_offset, end_offset - start_offset)
1860}
1861
1862/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1863///
1864/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1865/// slicing the values buffer as appropriate. This helps reduce the encoded
1866/// size of sliced arrays, as values that have been sliced away are not encoded
1867fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1868    if data.is_empty() {
1869        // As per specification, offsets buffer has N+1 elements.
1870        // So an empty array should still be encoded with a single 0 offset.
1871        let mut offsets = MutableBuffer::new(size_of::<O>());
1872        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1873        return (offsets.into(), MutableBuffer::new(0).into());
1874    }
1875
1876    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1877    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1878    (offsets, values)
1879}
1880
1881/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1882/// of a values buffer.
1883fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1884    if data.is_empty() {
1885        // As per specification, offsets buffer has N+1 elements.
1886        // So an empty array should still be encoded with a single 0 offset.
1887        let mut offsets = MutableBuffer::new(size_of::<O>());
1888        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1889        return (offsets.into(), data.child_data()[0].slice(0, 0));
1890    }
1891
1892    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1893    let child_data = data.child_data()[0].slice(original_start_offset, len);
1894    (offsets, child_data)
1895}
1896
1897/// Returns the offsets, sizes, and child data buffers for a ListView array.
1898///
1899/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
1900/// and offsets can be non-monotonic. When slicing, we simply pass through the
1901/// offsets and sizes without re-encoding, and do not slice the child data.
1902fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1903    data: &ArrayData,
1904) -> (Buffer, Buffer, ArrayData) {
1905    if data.is_empty() {
1906        return (
1907            MutableBuffer::new(0).into(),
1908            MutableBuffer::new(0).into(),
1909            data.child_data()[0].slice(0, 0),
1910        );
1911    }
1912
1913    let offsets = &data.buffers()[0];
1914    let sizes = &data.buffers()[1];
1915
1916    let element_size = std::mem::size_of::<O>();
1917    let offsets_slice =
1918        offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1919    let sizes_slice =
1920        sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1921
1922    let child_data = data.child_data()[0].clone();
1923
1924    (offsets_slice, sizes_slice, child_data)
1925}
1926
1927/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
1928///
1929/// The views buffer is sliced to only include views in the valid range based on
1930/// the array's offset and length. This helps reduce the encoded size of sliced
1931/// arrays
1932///
1933fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
1934    let buffer = &array_data.buffers()[0];
1935    let layout = layout(array_data.data_type());
1936    let spec = &layout.buffers[0];
1937
1938    let byte_width = get_buffer_element_width(spec);
1939    let min_length = array_data.len() * byte_width;
1940    if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1941        let byte_offset = array_data.offset() * byte_width;
1942        let buffer_length = min(min_length, buffer.len() - byte_offset);
1943        buffer.slice_with_length(byte_offset, buffer_length)
1944    } else {
1945        buffer.clone()
1946    }
1947}
1948
1949/// Recursively encodes `array_data` into its IPC representation.
1950///
1951/// Output goes to two separate channels:
1952/// - `meta`: accumulates IPC metadata (`nodes` and `buffers`) for the flatbuffer header.
1953/// - `sink`: the raw Arrow data bytes that form the IPC message body.
1954fn write_array_data(
1955    array_data: &ArrayData,
1956    meta: &mut IpcMetadataBuilder,
1957    sink: &mut IpcBodySink<'_>,
1958    offset: i64,
1959    compression_codec: Option<CompressionCodec>,
1960    compression_context: &mut CompressionContext,
1961    write_options: &IpcWriteOptions,
1962) -> Result<i64, ArrowError> {
1963    let mut offset = offset;
1964    let num_rows = array_data.len();
1965    if !matches!(array_data.data_type(), DataType::Null) {
1966        meta.nodes.push(crate::FieldNode::new(
1967            num_rows as i64,
1968            array_data.null_count() as i64,
1969        ));
1970    } else {
1971        // NullArray's null_count equals to len, but ArrayData null_count is always 0.
1972        meta.nodes
1973            .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1974    }
1975    if has_validity_bitmap(array_data.data_type(), write_options) {
1976        // write null buffer if exists
1977        let null_buffer = match array_data.nulls() {
1978            None => {
1979                // create a buffer and fill it with valid bits
1980                let num_bytes = bit_util::ceil(num_rows, 8);
1981                let buffer = MutableBuffer::new(num_bytes);
1982                let buffer = buffer.with_bitset(num_bytes, true);
1983                buffer.into()
1984            }
1985            Some(buffer) => buffer.inner().sliced(),
1986        };
1987
1988        offset = encode_sink_buffer(
1989            null_buffer,
1990            meta,
1991            sink,
1992            offset,
1993            compression_codec,
1994            compression_context,
1995            write_options.alignment,
1996        )?;
1997    }
1998
1999    let data_type = array_data.data_type();
2000    if matches!(data_type, DataType::Binary | DataType::Utf8) {
2001        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2002        for buffer in [offsets, values] {
2003            offset = encode_sink_buffer(
2004                buffer,
2005                meta,
2006                sink,
2007                offset,
2008                compression_codec,
2009                compression_context,
2010                write_options.alignment,
2011            )?;
2012        }
2013    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2014        // Slicing the views buffer is safe and easy,
2015        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
2016        //
2017        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
2018        // If users wants to "compact" the arrays prior to sending them over IPC,
2019        // they should consider the gc API suggested in #5513
2020        let views = get_or_truncate_buffer(array_data);
2021        offset = encode_sink_buffer(
2022            views,
2023            meta,
2024            sink,
2025            offset,
2026            compression_codec,
2027            compression_context,
2028            write_options.alignment,
2029        )?;
2030
2031        for buffer in array_data.buffers().iter().skip(1) {
2032            offset = encode_sink_buffer(
2033                buffer.clone(),
2034                meta,
2035                sink,
2036                offset,
2037                compression_codec,
2038                compression_context,
2039                write_options.alignment,
2040            )?;
2041        }
2042    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2043        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2044        for buffer in [offsets, values] {
2045            offset = encode_sink_buffer(
2046                buffer,
2047                meta,
2048                sink,
2049                offset,
2050                compression_codec,
2051                compression_context,
2052                write_options.alignment,
2053            )?;
2054        }
2055    } else if DataType::is_numeric(data_type)
2056        || DataType::is_temporal(data_type)
2057        || matches!(
2058            array_data.data_type(),
2059            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2060        )
2061    {
2062        // Truncate values
2063        assert_eq!(array_data.buffers().len(), 1);
2064
2065        let buffer = get_or_truncate_buffer(array_data);
2066        offset = encode_sink_buffer(
2067            buffer,
2068            meta,
2069            sink,
2070            offset,
2071            compression_codec,
2072            compression_context,
2073            write_options.alignment,
2074        )?;
2075    } else if matches!(data_type, DataType::Boolean) {
2076        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
2077        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
2078        assert_eq!(array_data.buffers().len(), 1);
2079
2080        let buffer = &array_data.buffers()[0];
2081        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2082        offset = encode_sink_buffer(
2083            buffer,
2084            meta,
2085            sink,
2086            offset,
2087            compression_codec,
2088            compression_context,
2089            write_options.alignment,
2090        )?;
2091    } else if matches!(
2092        data_type,
2093        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2094    ) {
2095        assert_eq!(array_data.buffers().len(), 1);
2096        assert_eq!(array_data.child_data().len(), 1);
2097
2098        // Truncate offsets and the child data to avoid writing unnecessary data
2099        let (offsets, sliced_child_data) = match data_type {
2100            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2101            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2102            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2103            _ => unreachable!(),
2104        };
2105        offset = encode_sink_buffer(
2106            offsets,
2107            meta,
2108            sink,
2109            offset,
2110            compression_codec,
2111            compression_context,
2112            write_options.alignment,
2113        )?;
2114        offset = write_array_data(
2115            &sliced_child_data,
2116            meta,
2117            sink,
2118            offset,
2119            compression_codec,
2120            compression_context,
2121            write_options,
2122        )?;
2123        return Ok(offset);
2124    } else if matches!(
2125        data_type,
2126        DataType::ListView(_) | DataType::LargeListView(_)
2127    ) {
2128        assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
2129        assert_eq!(array_data.child_data().len(), 1);
2130
2131        let (offsets, sizes, child_data) = match data_type {
2132            DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2133            DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2134            _ => unreachable!(),
2135        };
2136
2137        offset = encode_sink_buffer(
2138            offsets,
2139            meta,
2140            sink,
2141            offset,
2142            compression_codec,
2143            compression_context,
2144            write_options.alignment,
2145        )?;
2146        offset = encode_sink_buffer(
2147            sizes,
2148            meta,
2149            sink,
2150            offset,
2151            compression_codec,
2152            compression_context,
2153            write_options.alignment,
2154        )?;
2155
2156        offset = write_array_data(
2157            &child_data,
2158            meta,
2159            sink,
2160            offset,
2161            compression_codec,
2162            compression_context,
2163            write_options,
2164        )?;
2165        return Ok(offset);
2166    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2167        assert_eq!(array_data.child_data().len(), 1);
2168        let fixed_size = *fixed_size as usize;
2169
2170        let child_offset = array_data.offset() * fixed_size;
2171        let child_length = array_data.len() * fixed_size;
2172        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2173
2174        offset = write_array_data(
2175            &child_data,
2176            meta,
2177            sink,
2178            offset,
2179            compression_codec,
2180            compression_context,
2181            write_options,
2182        )?;
2183        return Ok(offset);
2184    } else {
2185        for buffer in array_data.buffers() {
2186            offset = encode_sink_buffer(
2187                buffer.clone(),
2188                meta,
2189                sink,
2190                offset,
2191                compression_codec,
2192                compression_context,
2193                write_options.alignment,
2194            )?;
2195        }
2196    }
2197
2198    match array_data.data_type() {
2199        DataType::Dictionary(_, _) => {}
2200        DataType::RunEndEncoded(_, _) => {
2201            // unslice the run encoded array.
2202            let arr = unslice_run_array(array_data.clone())?;
2203            // recursively write out nested structures
2204            for data_ref in arr.child_data() {
2205                // write the nested data (e.g list data)
2206                offset = write_array_data(
2207                    data_ref,
2208                    meta,
2209                    sink,
2210                    offset,
2211                    compression_codec,
2212                    compression_context,
2213                    write_options,
2214                )?;
2215            }
2216        }
2217        _ => {
2218            // recursively write out nested structures
2219            for data_ref in array_data.child_data() {
2220                // write the nested data (e.g list data)
2221                offset = write_array_data(
2222                    data_ref,
2223                    meta,
2224                    sink,
2225                    offset,
2226                    compression_codec,
2227                    compression_context,
2228                    write_options,
2229                )?;
2230            }
2231        }
2232    }
2233    Ok(offset)
2234}
2235
2236/// Encodes a single Arrow [`Buffer`] into the IPC body and records its metadata.
2237///
2238/// - `buffer`: the Arrow data buffer to encode (validity bitmap, offsets, values, etc.)
2239/// - `buffers`: in-progress list of IPC `Buffer` metadata entries (body offset + length) that
2240///   will eventually be serialised into the flatbuffer `RecordBatch` header.
2241/// - `sink`: destination for the actual encoded bytes; either a contiguous `Vec<u8>` for
2242///   in-memory writes, or a list of [`EncodedBuffer`] segments for deferred zero-copy streaming.
2243/// - `offset`: running byte offset into the IPC message body, used to compute the metadata entry.
2244/// - `compression_codec` / `compression_context`: if `Some`, the buffer is compressed before
2245///   writing; `compression_context` provides reusable scratch space across calls.
2246/// - `alignment`: each buffer is padded to this many bytes so the next buffer starts aligned.
2247///
2248/// Returns the updated `offset` (advanced by the encoded length plus any alignment padding).
2249fn encode_sink_buffer(
2250    buffer: Buffer,
2251    ipc_meta_data: &mut IpcMetadataBuilder,
2252    sink: &mut IpcBodySink<'_>,
2253    offset: i64,
2254    compression_codec: Option<CompressionCodec>,
2255    compression_context: &mut CompressionContext,
2256    alignment: u8,
2257) -> Result<i64, ArrowError> {
2258    let (encoded, len) = match compression_codec {
2259        None => {
2260            let len = buffer.len() as i64;
2261            (EncodedBuffer::Raw(buffer), len)
2262        }
2263        Some(codec) => {
2264            let mut scratch = Vec::new();
2265            let written =
2266                codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?;
2267            let len = i64::try_from(written)
2268                .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2269            (EncodedBuffer::Compressed(scratch), len)
2270        }
2271    };
2272
2273    let pad_len = pad_to_alignment(alignment, len as usize);
2274    sink.write(pad_len, encoded);
2275    ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2276    Ok(offset + len + pad_len as i64)
2277}
2278
2279const PADDING: [u8; 64] = [0; 64];
2280
2281/// Estimates the number of [`EncodedBuffer`] segments that [`write_array_data`]
2282/// will produce for a column of the given type.
2283///
2284/// Based on the Arrow IPC buffer layout
2285/// (<https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message>):
2286#[inline]
2287fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2288    match dt {
2289        DataType::Null => 0,
2290
2291        DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2292
2293        DataType::BinaryView | DataType::Utf8View => 3,
2294
2295        DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2296            2 + estimate_encoded_buffer_count(f.data_type())
2297        }
2298
2299        DataType::ListView(f) | DataType::LargeListView(f) => {
2300            3 + estimate_encoded_buffer_count(f.data_type())
2301        }
2302
2303        DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2304
2305        DataType::Struct(fields) => {
2306            1 + fields
2307                .iter()
2308                .map(|f| estimate_encoded_buffer_count(f.data_type()))
2309                .sum::<usize>()
2310        }
2311
2312        // Dictionary indices only; dictionary body is a separate IPC message.
2313        DataType::Dictionary(_, _) => 2,
2314
2315        DataType::Union(fields, UnionMode::Sparse) => {
2316            1 + fields
2317                .iter()
2318                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2319                .sum::<usize>()
2320        }
2321        DataType::Union(fields, UnionMode::Dense) => {
2322            2 + fields
2323                .iter()
2324                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2325                .sum::<usize>()
2326        }
2327
2328        DataType::RunEndEncoded(run_ends, values) => {
2329            estimate_encoded_buffer_count(run_ends.data_type())
2330                + estimate_encoded_buffer_count(values.data_type())
2331        }
2332        // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity + values.
2333        _ => 2,
2334    }
2335}
2336
2337/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2338#[inline]
2339fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2340    let a = usize::from(alignment - 1);
2341    ((len + a) & !a) - len
2342}
2343
2344#[cfg(test)]
2345mod tests {
2346    use std::hash::Hasher;
2347    use std::io::Cursor;
2348    use std::io::Seek;
2349
2350    use arrow_array::builder::FixedSizeListBuilder;
2351    use arrow_array::builder::Float32Builder;
2352    use arrow_array::builder::Int64Builder;
2353    use arrow_array::builder::MapBuilder;
2354    use arrow_array::builder::StringViewBuilder;
2355    use arrow_array::builder::UnionBuilder;
2356    use arrow_array::builder::{
2357        GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2358    };
2359    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2360    use arrow_array::types::*;
2361    use arrow_buffer::ScalarBuffer;
2362
2363    use crate::MetadataVersion;
2364    use crate::convert::fb_to_schema;
2365    use crate::reader::*;
2366    use crate::root_as_footer;
2367
2368    use super::*;
2369
2370    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2371        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2372        writer.write(rb).unwrap();
2373        writer.finish().unwrap();
2374        writer.into_inner().unwrap()
2375    }
2376
2377    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2378        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2379        reader.next().unwrap().unwrap()
2380    }
2381
2382    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2383        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2384        // without needing to construct a giant array to spill over the 64-byte default alignment
2385        // boundary.
2386        const IPC_ALIGNMENT: usize = 8;
2387
2388        let mut stream_writer = StreamWriter::try_new_with_options(
2389            vec![],
2390            record.schema_ref(),
2391            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2392        )
2393        .unwrap();
2394        stream_writer.write(record).unwrap();
2395        stream_writer.finish().unwrap();
2396        stream_writer.into_inner().unwrap()
2397    }
2398
2399    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2400        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2401        stream_reader.next().unwrap().unwrap()
2402    }
2403
2404    #[test]
2405    #[cfg(feature = "lz4")]
2406    fn test_write_empty_record_batch_lz4_compression() {
2407        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2408        let values: Vec<Option<i32>> = vec![];
2409        let array = Int32Array::from(values);
2410        let record_batch =
2411            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2412
2413        let mut file = tempfile::tempfile().unwrap();
2414
2415        {
2416            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2417                .unwrap()
2418                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2419                .unwrap();
2420
2421            let mut writer =
2422                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2423            writer.write(&record_batch).unwrap();
2424            writer.finish().unwrap();
2425        }
2426        file.rewind().unwrap();
2427        {
2428            // read file
2429            let reader = FileReader::try_new(file, None).unwrap();
2430            for read_batch in reader {
2431                read_batch
2432                    .unwrap()
2433                    .columns()
2434                    .iter()
2435                    .zip(record_batch.columns())
2436                    .for_each(|(a, b)| {
2437                        assert_eq!(a.data_type(), b.data_type());
2438                        assert_eq!(a.len(), b.len());
2439                        assert_eq!(a.null_count(), b.null_count());
2440                    });
2441            }
2442        }
2443    }
2444
2445    #[test]
2446    #[cfg(feature = "lz4")]
2447    fn test_write_file_with_lz4_compression() {
2448        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2449        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2450        let array = Int32Array::from(values);
2451        let record_batch =
2452            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2453
2454        let mut file = tempfile::tempfile().unwrap();
2455        {
2456            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2457                .unwrap()
2458                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2459                .unwrap();
2460
2461            let mut writer =
2462                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2463            writer.write(&record_batch).unwrap();
2464            writer.finish().unwrap();
2465        }
2466        file.rewind().unwrap();
2467        {
2468            // read file
2469            let reader = FileReader::try_new(file, None).unwrap();
2470            for read_batch in reader {
2471                read_batch
2472                    .unwrap()
2473                    .columns()
2474                    .iter()
2475                    .zip(record_batch.columns())
2476                    .for_each(|(a, b)| {
2477                        assert_eq!(a.data_type(), b.data_type());
2478                        assert_eq!(a.len(), b.len());
2479                        assert_eq!(a.null_count(), b.null_count());
2480                    });
2481            }
2482        }
2483    }
2484
2485    #[test]
2486    #[cfg(feature = "zstd")]
2487    fn test_write_file_with_zstd_compression() {
2488        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2489        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2490        let array = Int32Array::from(values);
2491        let record_batch =
2492            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2493        let mut file = tempfile::tempfile().unwrap();
2494        {
2495            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2496                .unwrap()
2497                .try_with_compression(Some(crate::CompressionType::ZSTD))
2498                .unwrap();
2499
2500            let mut writer =
2501                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2502            writer.write(&record_batch).unwrap();
2503            writer.finish().unwrap();
2504        }
2505        file.rewind().unwrap();
2506        {
2507            // read file
2508            let reader = FileReader::try_new(file, None).unwrap();
2509            for read_batch in reader {
2510                read_batch
2511                    .unwrap()
2512                    .columns()
2513                    .iter()
2514                    .zip(record_batch.columns())
2515                    .for_each(|(a, b)| {
2516                        assert_eq!(a.data_type(), b.data_type());
2517                        assert_eq!(a.len(), b.len());
2518                        assert_eq!(a.null_count(), b.null_count());
2519                    });
2520            }
2521        }
2522    }
2523
2524    #[test]
2525    fn test_write_file() {
2526        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2527        let values: Vec<Option<u32>> = vec![
2528            Some(999),
2529            None,
2530            Some(235),
2531            Some(123),
2532            None,
2533            None,
2534            None,
2535            None,
2536            None,
2537        ];
2538        let array1 = UInt32Array::from(values);
2539        let batch =
2540            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2541                .unwrap();
2542        let mut file = tempfile::tempfile().unwrap();
2543        {
2544            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2545
2546            writer.write(&batch).unwrap();
2547            writer.finish().unwrap();
2548        }
2549        file.rewind().unwrap();
2550
2551        {
2552            let mut reader = FileReader::try_new(file, None).unwrap();
2553            while let Some(Ok(read_batch)) = reader.next() {
2554                read_batch
2555                    .columns()
2556                    .iter()
2557                    .zip(batch.columns())
2558                    .for_each(|(a, b)| {
2559                        assert_eq!(a.data_type(), b.data_type());
2560                        assert_eq!(a.len(), b.len());
2561                        assert_eq!(a.null_count(), b.null_count());
2562                    });
2563            }
2564        }
2565    }
2566
2567    #[test]
2568    fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2569        let name = StringArray::from(Vec::<String>::new());
2570        let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2571
2572        assert_eq!(name.len(), 0);
2573        assert_eq!(
2574            offsets.len(),
2575            std::mem::size_of::<i32>(),
2576            "offsets buffer should contain one zero i32 offset"
2577        );
2578        assert_eq!(values.len(), 0, "values buffer should remain empty");
2579    }
2580
2581    #[test]
2582    fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2583        let name = LargeStringArray::from(Vec::<String>::new());
2584        let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2585
2586        assert_eq!(name.len(), 0);
2587        assert_eq!(
2588            offsets.len(),
2589            std::mem::size_of::<i64>(),
2590            "offsets buffer should contain one zero i64 offset"
2591        );
2592        assert_eq!(values.len(), 0, "values buffer should remain empty");
2593    }
2594
2595    #[test]
2596    fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2597        let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2598        let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2599
2600        assert_eq!(list.len(), 0);
2601        assert_eq!(
2602            offsets.len(),
2603            std::mem::size_of::<i32>(),
2604            "offsets buffer should contain one zero i32 offset"
2605        );
2606        assert_eq!(child_data.len(), 0, "child data should remain empty");
2607    }
2608
2609    #[test]
2610    fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2611        let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2612        let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2613
2614        assert_eq!(list.len(), 0);
2615        assert_eq!(
2616            offsets.len(),
2617            std::mem::size_of::<i64>(),
2618            "offsets buffer should contain one zero i64 offset"
2619        );
2620        assert_eq!(child_data.len(), 0, "child data should remain empty");
2621    }
2622
2623    fn write_null_file(options: IpcWriteOptions) {
2624        let schema = Schema::new(vec![
2625            Field::new("nulls", DataType::Null, true),
2626            Field::new("int32s", DataType::Int32, false),
2627            Field::new("nulls2", DataType::Null, true),
2628            Field::new("f64s", DataType::Float64, false),
2629        ]);
2630        let array1 = NullArray::new(32);
2631        let array2 = Int32Array::from(vec![1; 32]);
2632        let array3 = NullArray::new(32);
2633        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2634        let batch = RecordBatch::try_new(
2635            Arc::new(schema.clone()),
2636            vec![
2637                Arc::new(array1) as ArrayRef,
2638                Arc::new(array2) as ArrayRef,
2639                Arc::new(array3) as ArrayRef,
2640                Arc::new(array4) as ArrayRef,
2641            ],
2642        )
2643        .unwrap();
2644        let mut file = tempfile::tempfile().unwrap();
2645        {
2646            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2647
2648            writer.write(&batch).unwrap();
2649            writer.finish().unwrap();
2650        }
2651
2652        file.rewind().unwrap();
2653
2654        {
2655            let reader = FileReader::try_new(file, None).unwrap();
2656            reader.for_each(|maybe_batch| {
2657                maybe_batch
2658                    .unwrap()
2659                    .columns()
2660                    .iter()
2661                    .zip(batch.columns())
2662                    .for_each(|(a, b)| {
2663                        assert_eq!(a.data_type(), b.data_type());
2664                        assert_eq!(a.len(), b.len());
2665                        assert_eq!(a.null_count(), b.null_count());
2666                    });
2667            });
2668        }
2669    }
2670    #[test]
2671    fn test_write_null_file_v4() {
2672        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2673        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2674        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2675        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2676    }
2677
2678    #[test]
2679    fn test_write_null_file_v5() {
2680        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2681        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2682    }
2683
2684    #[test]
2685    fn track_union_nested_dict() {
2686        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2687
2688        let array = Arc::new(inner) as ArrayRef;
2689
2690        // Dict field with id 2
2691        #[allow(deprecated)]
2692        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2693        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2694
2695        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2696        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2697
2698        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2699
2700        let schema = Arc::new(Schema::new(vec![Field::new(
2701            "union",
2702            union.data_type().clone(),
2703            false,
2704        )]));
2705
2706        let r#gen = IpcDataGenerator::default();
2707        let mut dict_tracker = DictionaryTracker::new(false);
2708        r#gen.schema_to_bytes_with_dictionary_tracker(
2709            &schema,
2710            &mut dict_tracker,
2711            &IpcWriteOptions::default(),
2712        );
2713
2714        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2715
2716        r#gen
2717            .encode(
2718                &batch,
2719                &mut dict_tracker,
2720                &Default::default(),
2721                &mut Default::default(),
2722            )
2723            .unwrap();
2724
2725        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2726        // so we expect the dict will be keyed to 0
2727        assert!(dict_tracker.written.contains_key(&0));
2728    }
2729
2730    #[test]
2731    fn track_struct_nested_dict() {
2732        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2733
2734        let array = Arc::new(inner) as ArrayRef;
2735
2736        // Dict field with id 2
2737        #[allow(deprecated)]
2738        let dctfield = Arc::new(Field::new_dict(
2739            "dict",
2740            array.data_type().clone(),
2741            false,
2742            2,
2743            false,
2744        ));
2745
2746        let s = StructArray::from(vec![(dctfield, array)]);
2747        let struct_array = Arc::new(s) as ArrayRef;
2748
2749        let schema = Arc::new(Schema::new(vec![Field::new(
2750            "struct",
2751            struct_array.data_type().clone(),
2752            false,
2753        )]));
2754
2755        let r#gen = IpcDataGenerator::default();
2756        let mut dict_tracker = DictionaryTracker::new(false);
2757        r#gen.schema_to_bytes_with_dictionary_tracker(
2758            &schema,
2759            &mut dict_tracker,
2760            &IpcWriteOptions::default(),
2761        );
2762
2763        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2764
2765        r#gen
2766            .encode(
2767                &batch,
2768                &mut dict_tracker,
2769                &Default::default(),
2770                &mut Default::default(),
2771            )
2772            .unwrap();
2773
2774        assert!(dict_tracker.written.contains_key(&0));
2775    }
2776
2777    fn write_union_file(options: IpcWriteOptions) {
2778        let schema = Schema::new(vec![Field::new_union(
2779            "union",
2780            vec![0, 1],
2781            vec![
2782                Field::new("a", DataType::Int32, false),
2783                Field::new("c", DataType::Float64, false),
2784            ],
2785            UnionMode::Sparse,
2786        )]);
2787        let mut builder = UnionBuilder::with_capacity_sparse(5);
2788        builder.append::<Int32Type>("a", 1).unwrap();
2789        builder.append_null::<Int32Type>("a").unwrap();
2790        builder.append::<Float64Type>("c", 3.0).unwrap();
2791        builder.append_null::<Float64Type>("c").unwrap();
2792        builder.append::<Int32Type>("a", 4).unwrap();
2793        let union = builder.build().unwrap();
2794
2795        let batch =
2796            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2797                .unwrap();
2798
2799        let mut file = tempfile::tempfile().unwrap();
2800        {
2801            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2802
2803            writer.write(&batch).unwrap();
2804            writer.finish().unwrap();
2805        }
2806        file.rewind().unwrap();
2807
2808        {
2809            let reader = FileReader::try_new(file, None).unwrap();
2810            reader.for_each(|maybe_batch| {
2811                maybe_batch
2812                    .unwrap()
2813                    .columns()
2814                    .iter()
2815                    .zip(batch.columns())
2816                    .for_each(|(a, b)| {
2817                        assert_eq!(a.data_type(), b.data_type());
2818                        assert_eq!(a.len(), b.len());
2819                        assert_eq!(a.null_count(), b.null_count());
2820                    });
2821            });
2822        }
2823    }
2824
2825    #[test]
2826    fn test_write_union_file_v4_v5() {
2827        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2828        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2829    }
2830
2831    #[test]
2832    fn test_write_view_types() {
2833        const LONG_TEST_STRING: &str =
2834            "This is a long string to make sure binary view array handles it";
2835        let schema = Schema::new(vec![
2836            Field::new("field1", DataType::BinaryView, true),
2837            Field::new("field2", DataType::Utf8View, true),
2838        ]);
2839        let values: Vec<Option<&[u8]>> = vec![
2840            Some(b"foo"),
2841            Some(b"bar"),
2842            Some(LONG_TEST_STRING.as_bytes()),
2843        ];
2844        let binary_array = BinaryViewArray::from_iter(values);
2845        let utf8_array =
2846            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2847        let record_batch = RecordBatch::try_new(
2848            Arc::new(schema.clone()),
2849            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2850        )
2851        .unwrap();
2852
2853        let mut file = tempfile::tempfile().unwrap();
2854        {
2855            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2856            writer.write(&record_batch).unwrap();
2857            writer.finish().unwrap();
2858        }
2859        file.rewind().unwrap();
2860        {
2861            let mut reader = FileReader::try_new(&file, None).unwrap();
2862            let read_batch = reader.next().unwrap().unwrap();
2863            read_batch
2864                .columns()
2865                .iter()
2866                .zip(record_batch.columns())
2867                .for_each(|(a, b)| {
2868                    assert_eq!(a, b);
2869                });
2870        }
2871        file.rewind().unwrap();
2872        {
2873            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2874            let read_batch = reader.next().unwrap().unwrap();
2875            assert_eq!(read_batch.num_columns(), 1);
2876            let read_array = read_batch.column(0);
2877            let write_array = record_batch.column(0);
2878            assert_eq!(read_array, write_array);
2879        }
2880    }
2881
2882    #[test]
2883    fn truncate_ipc_record_batch() {
2884        fn create_batch(rows: usize) -> RecordBatch {
2885            let schema = Schema::new(vec![
2886                Field::new("a", DataType::Int32, false),
2887                Field::new("b", DataType::Utf8, false),
2888            ]);
2889
2890            let a = Int32Array::from_iter_values(0..rows as i32);
2891            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2892
2893            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2894        }
2895
2896        let big_record_batch = create_batch(65536);
2897
2898        let length = 5;
2899        let small_record_batch = create_batch(length);
2900
2901        let offset = 2;
2902        let record_batch_slice = big_record_batch.slice(offset, length);
2903        assert!(
2904            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2905        );
2906        assert_eq!(
2907            serialize_stream(&small_record_batch).len(),
2908            serialize_stream(&record_batch_slice).len()
2909        );
2910
2911        assert_eq!(
2912            deserialize_stream(serialize_stream(&record_batch_slice)),
2913            record_batch_slice
2914        );
2915    }
2916
2917    #[test]
2918    fn truncate_ipc_record_batch_with_nulls() {
2919        fn create_batch() -> RecordBatch {
2920            let schema = Schema::new(vec![
2921                Field::new("a", DataType::Int32, true),
2922                Field::new("b", DataType::Utf8, true),
2923            ]);
2924
2925            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2926            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2927
2928            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2929        }
2930
2931        let record_batch = create_batch();
2932        let record_batch_slice = record_batch.slice(1, 2);
2933        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2934
2935        assert!(
2936            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2937        );
2938
2939        assert!(deserialized_batch.column(0).is_null(0));
2940        assert!(deserialized_batch.column(0).is_valid(1));
2941        assert!(deserialized_batch.column(1).is_valid(0));
2942        assert!(deserialized_batch.column(1).is_valid(1));
2943
2944        assert_eq!(record_batch_slice, deserialized_batch);
2945    }
2946
2947    #[test]
2948    fn truncate_ipc_dictionary_array() {
2949        fn create_batch() -> RecordBatch {
2950            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2951                .into_iter()
2952                .collect();
2953            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2954
2955            let array = DictionaryArray::new(keys, Arc::new(values));
2956
2957            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2958
2959            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2960        }
2961
2962        let record_batch = create_batch();
2963        let record_batch_slice = record_batch.slice(1, 2);
2964        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2965
2966        assert!(
2967            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2968        );
2969
2970        assert!(deserialized_batch.column(0).is_valid(0));
2971        assert!(deserialized_batch.column(0).is_null(1));
2972
2973        assert_eq!(record_batch_slice, deserialized_batch);
2974    }
2975
2976    #[test]
2977    fn truncate_ipc_struct_array() {
2978        fn create_batch() -> RecordBatch {
2979            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2980                .into_iter()
2981                .collect();
2982            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2983
2984            let struct_array = StructArray::from(vec![
2985                (
2986                    Arc::new(Field::new("s", DataType::Utf8, true)),
2987                    Arc::new(strings) as ArrayRef,
2988                ),
2989                (
2990                    Arc::new(Field::new("c", DataType::Int32, true)),
2991                    Arc::new(ints) as ArrayRef,
2992                ),
2993            ]);
2994
2995            let schema = Schema::new(vec![Field::new(
2996                "struct_array",
2997                struct_array.data_type().clone(),
2998                true,
2999            )]);
3000
3001            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3002        }
3003
3004        let record_batch = create_batch();
3005        let record_batch_slice = record_batch.slice(1, 2);
3006        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3007
3008        assert!(
3009            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3010        );
3011
3012        let structs = deserialized_batch
3013            .column(0)
3014            .as_any()
3015            .downcast_ref::<StructArray>()
3016            .unwrap();
3017
3018        assert!(structs.column(0).is_null(0));
3019        assert!(structs.column(0).is_valid(1));
3020        assert!(structs.column(1).is_valid(0));
3021        assert!(structs.column(1).is_null(1));
3022        assert_eq!(record_batch_slice, deserialized_batch);
3023    }
3024
3025    #[test]
3026    fn truncate_ipc_string_array_with_all_empty_string() {
3027        fn create_batch() -> RecordBatch {
3028            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3029            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3030            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3031        }
3032
3033        let record_batch = create_batch();
3034        let record_batch_slice = record_batch.slice(0, 1);
3035        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3036
3037        assert!(
3038            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3039        );
3040        assert_eq!(record_batch_slice, deserialized_batch);
3041    }
3042
3043    #[test]
3044    fn test_stream_writer_writes_array_slice() {
3045        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3046        assert_eq!(
3047            vec![Some(1), Some(2), Some(3)],
3048            array.iter().collect::<Vec<_>>()
3049        );
3050
3051        let sliced = array.slice(1, 2);
3052        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3053
3054        let batch = RecordBatch::try_new(
3055            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3056            vec![Arc::new(sliced)],
3057        )
3058        .expect("new batch");
3059
3060        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3061        writer.write(&batch).expect("write");
3062        let outbuf = writer.into_inner().expect("inner");
3063
3064        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3065        let read_batch = reader.next().unwrap().expect("read batch");
3066
3067        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3068        assert_eq!(
3069            vec![Some(2), Some(3)],
3070            read_array.iter().collect::<Vec<_>>()
3071        );
3072    }
3073
3074    #[test]
3075    fn test_large_slice_uint32() {
3076        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3077            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3078        )));
3079    }
3080
3081    #[test]
3082    fn test_large_slice_string() {
3083        let strings: Vec<_> = (0..8000)
3084            .map(|i| {
3085                if i % 2 == 0 {
3086                    Some(format!("value{i}"))
3087                } else {
3088                    None
3089                }
3090            })
3091            .collect();
3092
3093        ensure_roundtrip(Arc::new(StringArray::from(strings)));
3094    }
3095
3096    #[test]
3097    fn test_large_slice_string_list() {
3098        let mut ls = ListBuilder::new(StringBuilder::new());
3099
3100        let mut s = String::new();
3101        for row_number in 0..8000 {
3102            if row_number % 2 == 0 {
3103                for list_element in 0..1000 {
3104                    s.clear();
3105                    use std::fmt::Write;
3106                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3107                    ls.values().append_value(&s);
3108                }
3109                ls.append(true)
3110            } else {
3111                ls.append(false); // null
3112            }
3113        }
3114
3115        ensure_roundtrip(Arc::new(ls.finish()));
3116    }
3117
3118    #[test]
3119    fn test_large_slice_string_list_of_lists() {
3120        // The reason for the special test is to verify reencode_offsets which looks both at
3121        // the starting offset and the data offset.  So need a dataset where the starting_offset
3122        // is zero but the data offset is not.
3123        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3124
3125        for _ in 0..4000 {
3126            ls.values().append(true);
3127            ls.append(true)
3128        }
3129
3130        let mut s = String::new();
3131        for row_number in 0..4000 {
3132            if row_number % 2 == 0 {
3133                for list_element in 0..1000 {
3134                    s.clear();
3135                    use std::fmt::Write;
3136                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3137                    ls.values().values().append_value(&s);
3138                }
3139                ls.values().append(true);
3140                ls.append(true)
3141            } else {
3142                ls.append(false); // null
3143            }
3144        }
3145
3146        ensure_roundtrip(Arc::new(ls.finish()));
3147    }
3148
3149    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
3150    fn ensure_roundtrip(array: ArrayRef) {
3151        let num_rows = array.len();
3152        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3153        // take off the first element
3154        let sliced_batch = orig_batch.slice(1, num_rows - 1);
3155
3156        let schema = orig_batch.schema();
3157        let stream_data = {
3158            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3159            writer.write(&sliced_batch).unwrap();
3160            writer.into_inner().unwrap()
3161        };
3162        let read_batch = {
3163            let projection = None;
3164            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3165            reader
3166                .next()
3167                .expect("expect no errors reading batch")
3168                .expect("expect batch")
3169        };
3170        assert_eq!(sliced_batch, read_batch);
3171
3172        let file_data = {
3173            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3174            writer.write(&sliced_batch).unwrap();
3175            writer.into_inner().unwrap().into_inner().unwrap()
3176        };
3177        let read_batch = {
3178            let projection = None;
3179            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3180            reader
3181                .next()
3182                .expect("expect no errors reading batch")
3183                .expect("expect batch")
3184        };
3185        assert_eq!(sliced_batch, read_batch);
3186
3187        // TODO test file writer/reader
3188    }
3189
3190    #[test]
3191    fn encode_bools_slice() {
3192        // Test case for https://github.com/apache/arrow-rs/issues/3496
3193        assert_bool_roundtrip([true, false], 1, 1);
3194
3195        // slice somewhere in the middle
3196        assert_bool_roundtrip(
3197            [
3198                true, false, true, true, false, false, true, true, true, false, false, false, true,
3199                true, true, true, false, false, false, false, true, true, true, true, true, false,
3200                false, false, false, false,
3201            ],
3202            13,
3203            17,
3204        );
3205
3206        // start at byte boundary, end in the middle
3207        assert_bool_roundtrip(
3208            [
3209                true, false, true, true, false, false, true, true, true, false, false, false,
3210            ],
3211            8,
3212            2,
3213        );
3214
3215        // start and stop and byte boundary
3216        assert_bool_roundtrip(
3217            [
3218                true, false, true, true, false, false, true, true, true, false, false, false, true,
3219                true, true, true, true, false, false, false, false, false,
3220            ],
3221            8,
3222            8,
3223        );
3224    }
3225
3226    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3227        let val_bool_field = Field::new("val", DataType::Boolean, false);
3228
3229        let schema = Arc::new(Schema::new(vec![val_bool_field]));
3230
3231        let bools = BooleanArray::from(bools.to_vec());
3232
3233        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3234        let batch = batch.slice(offset, length);
3235
3236        let data = serialize_stream(&batch);
3237        let batch2 = deserialize_stream(data);
3238        assert_eq!(batch, batch2);
3239    }
3240
3241    #[test]
3242    fn test_run_array_unslice() {
3243        let total_len = 80;
3244        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3245        let repeats: Vec<usize> = vec![3, 4, 1, 2];
3246        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3247        for ix in 0_usize..32 {
3248            let repeat: usize = repeats[ix % repeats.len()];
3249            let val: Option<i32> = vals[ix % vals.len()];
3250            input_array.resize(input_array.len() + repeat, val);
3251        }
3252
3253        // Encode the input_array to run array
3254        let mut builder =
3255            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3256        builder.extend(input_array.iter().copied());
3257        let run_array = builder.finish();
3258
3259        // test for all slice lengths.
3260        for slice_len in 1..=total_len {
3261            // test for offset = 0, slice length = slice_len
3262            let sliced_run_array: RunArray<Int16Type> =
3263                run_array.slice(0, slice_len).into_data().into();
3264
3265            // Create unsliced run array.
3266            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3267            let typed = unsliced_run_array
3268                .downcast::<PrimitiveArray<Int32Type>>()
3269                .unwrap();
3270            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3271            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3272            assert_eq!(expected, actual);
3273
3274            // test for offset = total_len - slice_len, length = slice_len
3275            let sliced_run_array: RunArray<Int16Type> = run_array
3276                .slice(total_len - slice_len, slice_len)
3277                .into_data()
3278                .into();
3279
3280            // Create unsliced run array.
3281            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3282            let typed = unsliced_run_array
3283                .downcast::<PrimitiveArray<Int32Type>>()
3284                .unwrap();
3285            let expected: Vec<Option<i32>> = input_array
3286                .iter()
3287                .skip(total_len - slice_len)
3288                .copied()
3289                .collect();
3290            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3291            assert_eq!(expected, actual);
3292        }
3293    }
3294
3295    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3296        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3297
3298        for i in 0..100_000 {
3299            for value in [i, i, i] {
3300                ls.values().append_value(value);
3301            }
3302            ls.append(true)
3303        }
3304
3305        ls.finish()
3306    }
3307
3308    fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3309        let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3310
3311        for i in 0..100_000 {
3312            for value in [
3313                format!("value{}", i),
3314                format!("value{}", i),
3315                format!("value{}", i),
3316            ] {
3317                ls.values().append_value(&value);
3318            }
3319            ls.append(true)
3320        }
3321
3322        ls.finish()
3323    }
3324
3325    fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3326        let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3327
3328        for i in 0..100_000 {
3329            for value in [
3330                format!("value{}", i),
3331                format!("value{}", i),
3332                format!("value{}", i),
3333            ] {
3334                ls.values().append_value(&value);
3335            }
3336            ls.append(true)
3337        }
3338
3339        ls.finish()
3340    }
3341
3342    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3343        let mut ls =
3344            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3345
3346        for _i in 0..10_000 {
3347            for j in 0..10 {
3348                for value in [j, j, j, j] {
3349                    ls.values().values().append_value(value);
3350                }
3351                ls.values().append(true)
3352            }
3353            ls.append(true);
3354        }
3355
3356        ls.finish()
3357    }
3358
3359    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3360        let mut ls =
3361            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3362
3363        for _i in 0..999 {
3364            ls.values().append(true);
3365            ls.append(true);
3366        }
3367
3368        for j in 0..10 {
3369            for value in [j, j, j, j] {
3370                ls.values().values().append_value(value);
3371            }
3372            ls.values().append(true)
3373        }
3374        ls.append(true);
3375
3376        for i in 0..9_000 {
3377            for j in 0..10 {
3378                for value in [i + j, i + j, i + j, i + j] {
3379                    ls.values().values().append_value(value);
3380                }
3381                ls.values().append(true)
3382            }
3383            ls.append(true);
3384        }
3385
3386        ls.finish()
3387    }
3388
3389    fn generate_map_array_data() -> MapArray {
3390        let keys_builder = UInt32Builder::new();
3391        let values_builder = UInt32Builder::new();
3392
3393        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3394
3395        for i in 0..100_000 {
3396            for _j in 0..3 {
3397                builder.keys().append_value(i);
3398                builder.values().append_value(i * 2);
3399            }
3400            builder.append(true).unwrap();
3401        }
3402
3403        builder.finish()
3404    }
3405
3406    #[test]
3407    fn reencode_offsets_when_first_offset_is_not_zero() {
3408        let original_list = generate_list_data::<i32>();
3409        let original_data = original_list.into_data();
3410        let slice_data = original_data.slice(75, 7);
3411        let (new_offsets, original_start, length) =
3412            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3413        assert_eq!(
3414            vec![0, 3, 6, 9, 12, 15, 18, 21],
3415            new_offsets.typed_data::<i32>()
3416        );
3417        assert_eq!(225, original_start);
3418        assert_eq!(21, length);
3419    }
3420
3421    #[test]
3422    fn reencode_offsets_when_first_offset_is_zero() {
3423        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3424        // ls = [[], [35, 42]
3425        ls.append(true);
3426        ls.values().append_value(35);
3427        ls.values().append_value(42);
3428        ls.append(true);
3429        let original_list = ls.finish();
3430        let original_data = original_list.into_data();
3431
3432        let slice_data = original_data.slice(1, 1);
3433        let (new_offsets, original_start, length) =
3434            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3435        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3436        assert_eq!(0, original_start);
3437        assert_eq!(2, length);
3438    }
3439
3440    /// Ensure when serde full & sliced versions they are equal to original input.
3441    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3442    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3443        // test both full and sliced versions
3444        let in_sliced = in_batch.slice(999, 1);
3445
3446        let bytes_batch = serialize_file(&in_batch);
3447        let bytes_sliced = serialize_file(&in_sliced);
3448
3449        // serializing 1 row should be significantly smaller than serializing 100,000
3450        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3451
3452        // ensure both are still valid and equal to originals
3453        let out_batch = deserialize_file(bytes_batch);
3454        assert_eq!(in_batch, out_batch);
3455
3456        let out_sliced = deserialize_file(bytes_sliced);
3457        assert_eq!(in_sliced, out_sliced);
3458    }
3459
3460    #[test]
3461    fn encode_lists() {
3462        let val_inner = Field::new_list_field(DataType::UInt32, true);
3463        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3464        let schema = Arc::new(Schema::new(vec![val_list_field]));
3465
3466        let values = Arc::new(generate_list_data::<i32>());
3467
3468        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3469        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3470    }
3471
3472    #[test]
3473    fn encode_empty_list() {
3474        let val_inner = Field::new_list_field(DataType::UInt32, true);
3475        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3476        let schema = Arc::new(Schema::new(vec![val_list_field]));
3477
3478        let values = Arc::new(generate_list_data::<i32>());
3479
3480        let in_batch = RecordBatch::try_new(schema, vec![values])
3481            .unwrap()
3482            .slice(999, 0);
3483        let out_batch = deserialize_file(serialize_file(&in_batch));
3484        assert_eq!(in_batch, out_batch);
3485    }
3486
3487    #[test]
3488    fn encode_large_lists() {
3489        let val_inner = Field::new_list_field(DataType::UInt32, true);
3490        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3491        let schema = Arc::new(Schema::new(vec![val_list_field]));
3492
3493        let values = Arc::new(generate_list_data::<i64>());
3494
3495        // ensure when serde full & sliced versions they are equal to original input
3496        // also ensure serialized sliced version is significantly smaller than serialized full
3497        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3498        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3499    }
3500
3501    #[test]
3502    fn encode_large_lists_non_zero_offset() {
3503        let val_inner = Field::new_list_field(DataType::UInt32, true);
3504        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3505        let schema = Arc::new(Schema::new(vec![val_list_field]));
3506
3507        let values = Arc::new(generate_list_data::<i64>());
3508
3509        check_sliced_list_array(schema, values);
3510    }
3511
3512    #[test]
3513    fn encode_large_lists_string_non_zero_offset() {
3514        let val_inner = Field::new_list_field(DataType::Utf8, true);
3515        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3516        let schema = Arc::new(Schema::new(vec![val_list_field]));
3517
3518        let values = Arc::new(generate_string_list_data::<i64>());
3519
3520        check_sliced_list_array(schema, values);
3521    }
3522
3523    #[test]
3524    fn encode_large_list_string_view_non_zero_offset() {
3525        let val_inner = Field::new_list_field(DataType::Utf8View, true);
3526        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3527        let schema = Arc::new(Schema::new(vec![val_list_field]));
3528
3529        let values = Arc::new(generate_utf8view_list_data::<i64>());
3530
3531        check_sliced_list_array(schema, values);
3532    }
3533
3534    fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3535        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3536            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3537                .unwrap()
3538                .slice(offset, len);
3539            let out_batch = deserialize_file(serialize_file(&in_batch));
3540            assert_eq!(in_batch, out_batch);
3541        }
3542    }
3543
3544    #[test]
3545    fn encode_nested_lists() {
3546        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3547        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3548        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3549        let schema = Arc::new(Schema::new(vec![list_field]));
3550
3551        let values = Arc::new(generate_nested_list_data::<i32>());
3552
3553        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3554        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3555    }
3556
3557    #[test]
3558    fn encode_nested_lists_starting_at_zero() {
3559        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3560        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3561        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3562        let schema = Arc::new(Schema::new(vec![list_field]));
3563
3564        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3565
3566        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3567        roundtrip_ensure_sliced_smaller(in_batch, 1);
3568    }
3569
3570    #[test]
3571    fn encode_map_array() {
3572        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3573        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3574        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3575        let schema = Arc::new(Schema::new(vec![map_field]));
3576
3577        let values = Arc::new(generate_map_array_data());
3578
3579        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3580        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3581    }
3582
3583    fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3584        let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3585
3586        for i in 0u32..100_000 {
3587            if i.is_multiple_of(10_000) {
3588                builder.append(false);
3589                continue;
3590            }
3591            for value in [i, i, i] {
3592                builder.values().append_value(value);
3593            }
3594            builder.append(true);
3595        }
3596
3597        builder.finish()
3598    }
3599
3600    #[test]
3601    fn encode_list_view_arrays() {
3602        let val_inner = Field::new_list_field(DataType::UInt32, true);
3603        let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3604        let schema = Arc::new(Schema::new(vec![val_field]));
3605
3606        let values = Arc::new(generate_list_view_data::<i32>());
3607
3608        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3609        let out_batch = deserialize_file(serialize_file(&in_batch));
3610        assert_eq!(in_batch, out_batch);
3611    }
3612
3613    #[test]
3614    fn encode_large_list_view_arrays() {
3615        let val_inner = Field::new_list_field(DataType::UInt32, true);
3616        let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3617        let schema = Arc::new(Schema::new(vec![val_field]));
3618
3619        let values = Arc::new(generate_list_view_data::<i64>());
3620
3621        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3622        let out_batch = deserialize_file(serialize_file(&in_batch));
3623        assert_eq!(in_batch, out_batch);
3624    }
3625
3626    #[test]
3627    fn check_sliced_list_view_array() {
3628        let inner = Field::new_list_field(DataType::UInt32, true);
3629        let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3630        let schema = Arc::new(Schema::new(vec![field]));
3631        let values = Arc::new(generate_list_view_data::<i32>());
3632
3633        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3634            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3635                .unwrap()
3636                .slice(offset, len);
3637            let out_batch = deserialize_file(serialize_file(&in_batch));
3638            assert_eq!(in_batch, out_batch);
3639        }
3640    }
3641
3642    #[test]
3643    fn check_sliced_large_list_view_array() {
3644        let inner = Field::new_list_field(DataType::UInt32, true);
3645        let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3646        let schema = Arc::new(Schema::new(vec![field]));
3647        let values = Arc::new(generate_list_view_data::<i64>());
3648
3649        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3650            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3651                .unwrap()
3652                .slice(offset, len);
3653            let out_batch = deserialize_file(serialize_file(&in_batch));
3654            assert_eq!(in_batch, out_batch);
3655        }
3656    }
3657
3658    fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3659        let inner_builder = UInt32Builder::new();
3660        let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3661        let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3662
3663        for i in 0u32..10_000 {
3664            if i.is_multiple_of(1_000) {
3665                outer_builder.append(false);
3666                continue;
3667            }
3668
3669            for _ in 0..3 {
3670                for value in [i, i + 1, i + 2] {
3671                    outer_builder.values().values().append_value(value);
3672                }
3673                outer_builder.values().append(true);
3674            }
3675            outer_builder.append(true);
3676        }
3677
3678        outer_builder.finish()
3679    }
3680
3681    #[test]
3682    fn encode_nested_list_views() {
3683        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3684        let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3685        let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3686        let schema = Arc::new(Schema::new(vec![list_field]));
3687
3688        let values = Arc::new(generate_nested_list_view_data::<i32>());
3689
3690        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3691        let out_batch = deserialize_file(serialize_file(&in_batch));
3692        assert_eq!(in_batch, out_batch);
3693    }
3694
3695    fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3696        list_data_type: DataType,
3697        offsets: &[U; 5],
3698        sizes: &[U; 4],
3699    ) {
3700        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3701        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3702        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3703        let dict_data = dict_array.to_data();
3704
3705        let value_offsets = Buffer::from_slice_ref(offsets);
3706        let value_sizes = Buffer::from_slice_ref(sizes);
3707
3708        let list_data = ArrayData::builder(list_data_type)
3709            .len(4)
3710            .add_buffer(value_offsets)
3711            .add_buffer(value_sizes)
3712            .add_child_data(dict_data)
3713            .build()
3714            .unwrap();
3715        let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3716
3717        let schema = Arc::new(Schema::new(vec![Field::new(
3718            "f1",
3719            list_view_array.data_type().clone(),
3720            false,
3721        )]));
3722        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3723
3724        let output_batch = deserialize_file(serialize_file(&input_batch));
3725        assert_eq!(input_batch, output_batch);
3726
3727        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3728        assert_eq!(input_batch, output_batch);
3729    }
3730
3731    #[test]
3732    fn test_roundtrip_list_view_of_dict() {
3733        #[allow(deprecated)]
3734        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3735            "item",
3736            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3737            true,
3738            1,
3739            false,
3740        )));
3741        let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3742        let sizes: &[i32; 4] = &[2, 2, 0, 3];
3743        test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3744    }
3745
3746    #[test]
3747    fn test_roundtrip_large_list_view_of_dict() {
3748        #[allow(deprecated)]
3749        let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3750            "item",
3751            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3752            true,
3753            2,
3754            false,
3755        )));
3756        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3757        let sizes: &[i64; 4] = &[2, 2, 0, 3];
3758        test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3759    }
3760
3761    #[test]
3762    fn test_roundtrip_sliced_list_view_of_dict() {
3763        #[allow(deprecated)]
3764        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3765            "item",
3766            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3767            true,
3768            3,
3769            false,
3770        )));
3771
3772        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3773        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3774        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3775        let dict_data = dict_array.to_data();
3776
3777        let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3778        let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3779        let value_offsets = Buffer::from_slice_ref(offsets);
3780        let value_sizes = Buffer::from_slice_ref(sizes);
3781
3782        let list_data = ArrayData::builder(list_data_type)
3783            .len(6)
3784            .add_buffer(value_offsets)
3785            .add_buffer(value_sizes)
3786            .add_child_data(dict_data)
3787            .build()
3788            .unwrap();
3789        let list_view_array = GenericListViewArray::<i32>::from(list_data);
3790
3791        let schema = Arc::new(Schema::new(vec![Field::new(
3792            "f1",
3793            list_view_array.data_type().clone(),
3794            false,
3795        )]));
3796        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3797
3798        let sliced_batch = input_batch.slice(1, 4);
3799
3800        let output_batch = deserialize_file(serialize_file(&sliced_batch));
3801        assert_eq!(sliced_batch, output_batch);
3802
3803        let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3804        assert_eq!(sliced_batch, output_batch);
3805    }
3806
3807    #[test]
3808    fn test_roundtrip_dense_union_of_dict() {
3809        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3810        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3811        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3812
3813        #[allow(deprecated)]
3814        let dict_field = Arc::new(Field::new_dict(
3815            "dict",
3816            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3817            true,
3818            1,
3819            false,
3820        ));
3821        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3822        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3823
3824        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3825        let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3826
3827        let int_array = Int32Array::from(vec![100, 200]);
3828
3829        let union = UnionArray::try_new(
3830            union_fields.clone(),
3831            types,
3832            Some(offsets),
3833            vec![Arc::new(dict_array), Arc::new(int_array)],
3834        )
3835        .unwrap();
3836
3837        let schema = Arc::new(Schema::new(vec![Field::new(
3838            "union",
3839            DataType::Union(union_fields, UnionMode::Dense),
3840            false,
3841        )]));
3842        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3843
3844        let output_batch = deserialize_file(serialize_file(&input_batch));
3845        assert_eq!(input_batch, output_batch);
3846
3847        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3848        assert_eq!(input_batch, output_batch);
3849    }
3850
3851    #[test]
3852    fn test_roundtrip_sparse_union_of_dict() {
3853        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3854        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3855        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3856
3857        #[allow(deprecated)]
3858        let dict_field = Arc::new(Field::new_dict(
3859            "dict",
3860            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3861            true,
3862            2,
3863            false,
3864        ));
3865        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3866        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3867
3868        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3869
3870        let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3871
3872        let union = UnionArray::try_new(
3873            union_fields.clone(),
3874            types,
3875            None,
3876            vec![Arc::new(dict_array), Arc::new(int_array)],
3877        )
3878        .unwrap();
3879
3880        let schema = Arc::new(Schema::new(vec![Field::new(
3881            "union",
3882            DataType::Union(union_fields, UnionMode::Sparse),
3883            false,
3884        )]));
3885        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3886
3887        let output_batch = deserialize_file(serialize_file(&input_batch));
3888        assert_eq!(input_batch, output_batch);
3889
3890        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3891        assert_eq!(input_batch, output_batch);
3892    }
3893
3894    #[test]
3895    fn test_roundtrip_map_with_dict_keys() {
3896        // Building a map array is a bit involved. We first build a struct arary that has a key and
3897        // value field and then use that to build the actual map array.
3898        let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3899        let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3900        let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3901
3902        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3903
3904        #[allow(deprecated)]
3905        let entries_field = Arc::new(Field::new(
3906            "entries",
3907            DataType::Struct(
3908                vec![
3909                    Field::new_dict(
3910                        "key",
3911                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3912                        false,
3913                        1,
3914                        false,
3915                    ),
3916                    Field::new("value", DataType::Int32, true),
3917                ]
3918                .into(),
3919            ),
3920            false,
3921        ));
3922
3923        let entries = StructArray::from(vec![
3924            (
3925                Arc::new(Field::new(
3926                    "key",
3927                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3928                    false,
3929                )),
3930                Arc::new(dict_keys) as ArrayRef,
3931            ),
3932            (
3933                Arc::new(Field::new("value", DataType::Int32, true)),
3934                Arc::new(values) as ArrayRef,
3935            ),
3936        ]);
3937
3938        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3939
3940        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3941            .len(3)
3942            .add_buffer(offsets)
3943            .add_child_data(entries.into_data())
3944            .build()
3945            .unwrap();
3946        let map_array = MapArray::from(map_data);
3947
3948        let schema = Arc::new(Schema::new(vec![Field::new(
3949            "map",
3950            map_array.data_type().clone(),
3951            false,
3952        )]));
3953        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3954
3955        let output_batch = deserialize_file(serialize_file(&input_batch));
3956        assert_eq!(input_batch, output_batch);
3957
3958        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3959        assert_eq!(input_batch, output_batch);
3960    }
3961
3962    #[test]
3963    fn test_roundtrip_map_with_dict_values() {
3964        // Building a map array is a bit involved. We first build a struct arary that has a key and
3965        // value field and then use that to build the actual map array.
3966        let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3967
3968        let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3969        let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3970        let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3971
3972        #[allow(deprecated)]
3973        let entries_field = Arc::new(Field::new(
3974            "entries",
3975            DataType::Struct(
3976                vec![
3977                    Field::new("key", DataType::Utf8, false),
3978                    Field::new_dict(
3979                        "value",
3980                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3981                        true,
3982                        2,
3983                        false,
3984                    ),
3985                ]
3986                .into(),
3987            ),
3988            false,
3989        ));
3990
3991        let entries = StructArray::from(vec![
3992            (
3993                Arc::new(Field::new("key", DataType::Utf8, false)),
3994                Arc::new(keys) as ArrayRef,
3995            ),
3996            (
3997                Arc::new(Field::new(
3998                    "value",
3999                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4000                    true,
4001                )),
4002                Arc::new(dict_values) as ArrayRef,
4003            ),
4004        ]);
4005
4006        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4007
4008        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4009            .len(3)
4010            .add_buffer(offsets)
4011            .add_child_data(entries.into_data())
4012            .build()
4013            .unwrap();
4014        let map_array = MapArray::from(map_data);
4015
4016        let schema = Arc::new(Schema::new(vec![Field::new(
4017            "map",
4018            map_array.data_type().clone(),
4019            false,
4020        )]));
4021        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4022
4023        let output_batch = deserialize_file(serialize_file(&input_batch));
4024        assert_eq!(input_batch, output_batch);
4025
4026        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4027        assert_eq!(input_batch, output_batch);
4028    }
4029
4030    #[test]
4031    fn test_decimal128_alignment16_is_sufficient() {
4032        const IPC_ALIGNMENT: usize = 16;
4033
4034        // Test a bunch of different dimensions to ensure alignment is never an issue.
4035        // For example, if we only test `num_cols = 1` then even with alignment 8 this
4036        // test would _happen_ to pass, even though for different dimensions like
4037        // `num_cols = 2` it would fail.
4038        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4039            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
4040
4041            let mut fields = Vec::new();
4042            let mut arrays = Vec::new();
4043            for i in 0..num_cols {
4044                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4045                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4046                fields.push(field);
4047                arrays.push(Arc::new(array) as Arc<dyn Array>);
4048            }
4049            let schema = Schema::new(fields);
4050            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4051
4052            let mut writer = FileWriter::try_new_with_options(
4053                Vec::new(),
4054                batch.schema_ref(),
4055                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4056            )
4057            .unwrap();
4058            writer.write(&batch).unwrap();
4059            writer.finish().unwrap();
4060
4061            let out: Vec<u8> = writer.into_inner().unwrap();
4062
4063            let buffer = Buffer::from_vec(out);
4064            let trailer_start = buffer.len() - 10;
4065            let footer_len =
4066                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4067            let footer =
4068                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4069
4070            let schema = fb_to_schema(footer.schema().unwrap());
4071
4072            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
4073            // for `read_record_batch` later on to read the data in a zero-copy manner.
4074            let decoder =
4075                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4076
4077            let batches = footer.recordBatches().unwrap();
4078
4079            let block = batches.get(0);
4080            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4081            let data = buffer.slice_with_length(block.offset() as _, block_len);
4082
4083            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4084
4085            assert_eq!(batch, batch2);
4086        }
4087    }
4088
4089    #[test]
4090    fn test_decimal128_alignment8_is_unaligned() {
4091        const IPC_ALIGNMENT: usize = 8;
4092
4093        let num_cols = 2;
4094        let num_rows = 1;
4095
4096        let mut fields = Vec::new();
4097        let mut arrays = Vec::new();
4098        for i in 0..num_cols {
4099            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4100            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4101            fields.push(field);
4102            arrays.push(Arc::new(array) as Arc<dyn Array>);
4103        }
4104        let schema = Schema::new(fields);
4105        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4106
4107        let mut writer = FileWriter::try_new_with_options(
4108            Vec::new(),
4109            batch.schema_ref(),
4110            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4111        )
4112        .unwrap();
4113        writer.write(&batch).unwrap();
4114        writer.finish().unwrap();
4115
4116        let out: Vec<u8> = writer.into_inner().unwrap();
4117
4118        let buffer = Buffer::from_vec(out);
4119        let trailer_start = buffer.len() - 10;
4120        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4121        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4122        let schema = fb_to_schema(footer.schema().unwrap());
4123
4124        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
4125        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
4126        let decoder =
4127            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4128
4129        let batches = footer.recordBatches().unwrap();
4130
4131        let block = batches.get(0);
4132        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4133        let data = buffer.slice_with_length(block.offset() as _, block_len);
4134
4135        let result = decoder.read_record_batch(block, &data);
4136
4137        let error = result.unwrap_err();
4138        assert_eq!(
4139            error.to_string(),
4140            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4141             offset from expected alignment of 16 by 8"
4142        );
4143    }
4144
4145    #[test]
4146    fn test_flush() {
4147        // We write a schema which is small enough to fit into a buffer and not get flushed,
4148        // and then force the write with .flush().
4149        let num_cols = 2;
4150        let mut fields = Vec::new();
4151        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4152        for i in 0..num_cols {
4153            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4154            fields.push(field);
4155        }
4156        let schema = Schema::new(fields);
4157        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4158        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4159        let mut stream_writer =
4160            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4161                .unwrap();
4162        let mut file_writer =
4163            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4164
4165        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4166        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4167        stream_writer.flush().unwrap();
4168        file_writer.flush().unwrap();
4169        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4170        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4171        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4172        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
4173        // and then a length of 0 (4 bytes) for a total of 8 bytes.
4174        // Everything before that should have been flushed in the .flush() call.
4175        let expected_stream_flushed_bytes = stream_out.len() - 8;
4176        // A file write is the same as the stream write except for the leading magic string
4177        // ARROW1 plus padding, which is 8 bytes.
4178        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4179
4180        assert!(
4181            stream_bytes_written_on_new < stream_bytes_written_on_flush,
4182            "this test makes no sense if flush is not actually required"
4183        );
4184        assert!(
4185            file_bytes_written_on_new < file_bytes_written_on_flush,
4186            "this test makes no sense if flush is not actually required"
4187        );
4188        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4189        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4190    }
4191
4192    #[test]
4193    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4194        let l1_type =
4195            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4196        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4197
4198        let l0_builder = Float32Builder::new();
4199        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4200            "item",
4201            DataType::Float32,
4202            false,
4203        )));
4204        let mut l2_builder =
4205            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4206
4207        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4208            l2_builder.values().values().append_value(point[0]);
4209            l2_builder.values().values().append_value(point[1]);
4210            l2_builder.values().values().append_value(point[2]);
4211
4212            l2_builder.values().append(true);
4213        }
4214        l2_builder.append(true);
4215
4216        let point = [10., 11., 12.];
4217        l2_builder.values().values().append_value(point[0]);
4218        l2_builder.values().values().append_value(point[1]);
4219        l2_builder.values().values().append_value(point[2]);
4220
4221        l2_builder.values().append(true);
4222        l2_builder.append(true);
4223
4224        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4225
4226        let schema = Arc::new(Schema::new_with_metadata(
4227            vec![Field::new("points", l2_type, false)],
4228            HashMap::default(),
4229        ));
4230
4231        // Test a variety of combinations that include 0 and non-zero offsets
4232        // and also portions or the rest of the array
4233        test_slices(&array, &schema, 0, 1)?;
4234        test_slices(&array, &schema, 0, 2)?;
4235        test_slices(&array, &schema, 1, 1)?;
4236
4237        Ok(())
4238    }
4239
4240    #[test]
4241    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4242        let l0_builder = Float32Builder::new();
4243        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4244        let mut l2_builder = ListBuilder::new(l1_builder);
4245
4246        for point in [
4247            [Some(1.0), Some(2.0), None],
4248            [Some(4.0), Some(5.0), Some(6.0)],
4249            [None, Some(8.0), Some(9.0)],
4250        ] {
4251            for p in point {
4252                match p {
4253                    Some(p) => l2_builder.values().values().append_value(p),
4254                    None => l2_builder.values().values().append_null(),
4255                }
4256            }
4257
4258            l2_builder.values().append(true);
4259        }
4260        l2_builder.append(true);
4261
4262        let point = [Some(10.), None, None];
4263        for p in point {
4264            match p {
4265                Some(p) => l2_builder.values().values().append_value(p),
4266                None => l2_builder.values().values().append_null(),
4267            }
4268        }
4269
4270        l2_builder.values().append(true);
4271        l2_builder.append(true);
4272
4273        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4274
4275        let schema = Arc::new(Schema::new_with_metadata(
4276            vec![Field::new(
4277                "points",
4278                DataType::List(Arc::new(Field::new(
4279                    "item",
4280                    DataType::FixedSizeList(
4281                        Arc::new(Field::new("item", DataType::Float32, true)),
4282                        3,
4283                    ),
4284                    true,
4285                ))),
4286                true,
4287            )],
4288            HashMap::default(),
4289        ));
4290
4291        // Test a variety of combinations that include 0 and non-zero offsets
4292        // and also portions or the rest of the array
4293        test_slices(&array, &schema, 0, 1)?;
4294        test_slices(&array, &schema, 0, 2)?;
4295        test_slices(&array, &schema, 1, 1)?;
4296
4297        Ok(())
4298    }
4299
4300    fn test_slices(
4301        parent_array: &ArrayRef,
4302        schema: &SchemaRef,
4303        offset: usize,
4304        length: usize,
4305    ) -> Result<(), ArrowError> {
4306        let subarray = parent_array.slice(offset, length);
4307        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4308
4309        let mut bytes = Vec::new();
4310        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4311        writer.write(&original_batch)?;
4312        writer.finish()?;
4313
4314        let mut cursor = std::io::Cursor::new(bytes);
4315        let mut reader = StreamReader::try_new(&mut cursor, None)?;
4316        let returned_batch = reader.next().unwrap()?;
4317
4318        assert_eq!(original_batch, returned_batch);
4319
4320        Ok(())
4321    }
4322
4323    #[test]
4324    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4325        let int_builder = Int64Builder::new();
4326        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4327            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4328
4329        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4330            fixed_list_builder.values().append_value(point[0]);
4331            fixed_list_builder.values().append_value(point[1]);
4332            fixed_list_builder.values().append_value(point[2]);
4333
4334            fixed_list_builder.append(true);
4335        }
4336
4337        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4338
4339        let schema = Arc::new(Schema::new_with_metadata(
4340            vec![Field::new(
4341                "points",
4342                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4343                false,
4344            )],
4345            HashMap::default(),
4346        ));
4347
4348        // Test a variety of combinations that include 0 and non-zero offsets
4349        // and also portions or the rest of the array
4350        test_slices(&array, &schema, 0, 4)?;
4351        test_slices(&array, &schema, 0, 2)?;
4352        test_slices(&array, &schema, 1, 3)?;
4353        test_slices(&array, &schema, 2, 1)?;
4354
4355        Ok(())
4356    }
4357
4358    #[test]
4359    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4360        let int_builder = Int64Builder::new();
4361        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4362
4363        for point in [
4364            [Some(1), Some(2), None],
4365            [Some(4), Some(5), Some(6)],
4366            [None, Some(8), Some(9)],
4367            [Some(10), None, None],
4368        ] {
4369            for p in point {
4370                match p {
4371                    Some(p) => fixed_list_builder.values().append_value(p),
4372                    None => fixed_list_builder.values().append_null(),
4373                }
4374            }
4375
4376            fixed_list_builder.append(true);
4377        }
4378
4379        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4380
4381        let schema = Arc::new(Schema::new_with_metadata(
4382            vec![Field::new(
4383                "points",
4384                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4385                true,
4386            )],
4387            HashMap::default(),
4388        ));
4389
4390        // Test a variety of combinations that include 0 and non-zero offsets
4391        // and also portions or the rest of the array
4392        test_slices(&array, &schema, 0, 4)?;
4393        test_slices(&array, &schema, 0, 2)?;
4394        test_slices(&array, &schema, 1, 3)?;
4395        test_slices(&array, &schema, 2, 1)?;
4396
4397        Ok(())
4398    }
4399
4400    #[test]
4401    fn test_metadata_encoding_ordering() {
4402        fn create_hash() -> u64 {
4403            let metadata: HashMap<String, String> = [
4404                ("a", "1"), //
4405                ("b", "2"), //
4406                ("c", "3"), //
4407                ("d", "4"), //
4408                ("e", "5"), //
4409            ]
4410            .into_iter()
4411            .map(|(k, v)| (k.to_owned(), v.to_owned()))
4412            .collect();
4413
4414            // Set metadata on both the schema and a field within it.
4415            let schema = Arc::new(
4416                Schema::new(vec![
4417                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4418                ])
4419                .with_metadata(metadata)
4420                .clone(),
4421            );
4422            let batch = RecordBatch::new_empty(schema.clone());
4423
4424            let mut bytes = Vec::new();
4425            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4426            w.write(&batch).unwrap();
4427            w.finish().unwrap();
4428
4429            let mut h = std::hash::DefaultHasher::new();
4430            h.write(&bytes);
4431            h.finish()
4432        }
4433
4434        let expected = create_hash();
4435
4436        // Since there is randomness in the HashMap and we cannot specify our
4437        // own Hasher for the implementation used for metadata, run the above
4438        // code 20x and verify it does not change. This is not perfect but it
4439        // should be good enough.
4440        let all_passed = (0..20).all(|_| create_hash() == expected);
4441        assert!(all_passed);
4442    }
4443
4444    #[test]
4445    fn test_dictionary_tracker_reset() {
4446        let data_gen = IpcDataGenerator::default();
4447        let mut dictionary_tracker = DictionaryTracker::new(false);
4448        let writer_options = IpcWriteOptions::default();
4449        let mut compression_ctx = CompressionContext::default();
4450
4451        let schema = Arc::new(Schema::new(vec![Field::new(
4452            "a",
4453            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4454            false,
4455        )]));
4456
4457        let mut write_single_batch_stream =
4458            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4459                let mut buffer = Vec::new();
4460
4461                // create a new IPC stream:
4462                let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4463                    &schema,
4464                    dict_tracker,
4465                    &writer_options,
4466                );
4467                _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4468
4469                let (encoded_dicts, encoded_batch) = data_gen
4470                    .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4471                    .unwrap();
4472                for encoded_dict in encoded_dicts {
4473                    _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4474                }
4475                _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4476
4477                buffer
4478            };
4479
4480        let batch1 = RecordBatch::try_new(
4481            schema.clone(),
4482            vec![Arc::new(DictionaryArray::new(
4483                UInt8Array::from_iter_values([0]),
4484                Arc::new(StringArray::from_iter_values(["a"])),
4485            ))],
4486        )
4487        .unwrap();
4488        let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4489
4490        // ensure we can read the stream back
4491        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4492        let read_batch = reader.next().unwrap().unwrap();
4493        assert_eq!(read_batch, batch1);
4494
4495        // reset the dictionary tracker so it can be used for next stream
4496        dictionary_tracker.clear();
4497
4498        // now write a 2nd stream and ensure we can also read it:
4499        let batch2 = RecordBatch::try_new(
4500            schema.clone(),
4501            vec![Arc::new(DictionaryArray::new(
4502                UInt8Array::from_iter_values([0]),
4503                Arc::new(StringArray::from_iter_values(["a"])),
4504            ))],
4505        )
4506        .unwrap();
4507        let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4508        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4509        let read_batch = reader.next().unwrap().unwrap();
4510        assert_eq!(read_batch, batch2);
4511    }
4512}