Skip to main content

arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
50#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52    /// Write padding after memory buffers to this multiple of bytes.
53    /// Must be 8, 16, 32, or 64 - defaults to 64.
54    alignment: u8,
55    /// The legacy format is for releases before 0.15.0, and uses metadata V4
56    write_legacy_ipc_format: bool,
57    /// The metadata version to write. The Rust IPC writer supports V4+
58    ///
59    /// *Default versions per crate*
60    ///
61    /// When creating the default IpcWriteOptions, the following metadata versions are used:
62    ///
63    /// version 2.0.0: V4, with legacy format enabled
64    /// version 4.0.0: V5
65    metadata_version: crate::MetadataVersion,
66    /// Compression, if desired. Will result in a runtime error
67    /// if the corresponding feature is not enabled
68    batch_compression_type: Option<crate::CompressionType>,
69    /// How to handle updating dictionaries in IPC messages
70    dictionary_handling: DictionaryHandling,
71}
72
73/// A single buffer segment ready to be written to the output stream.
74///
75/// For the uncompressed path the original Arc-backed [`Buffer`] is stored
76/// directly (zero copy). For the compressed path the compressed bytes are
77/// owned by a scratch `Vec<u8>`.
78enum EncodedBuffer {
79    /// Uncompressed: Arc-backed reference to the original array buffer.
80    Raw(Buffer),
81    /// Compressed: owned scratch bytes produced by the codec.
82    Compressed(Vec<u8>),
83}
84
85impl EncodedBuffer {
86    fn as_slice(&self) -> &[u8] {
87        match self {
88            EncodedBuffer::Raw(b) => b.as_slice(),
89            EncodedBuffer::Compressed(v) => v.as_slice(),
90        }
91    }
92
93    fn len(&self) -> usize {
94        match self {
95            EncodedBuffer::Raw(b) => b.len(),
96            EncodedBuffer::Compressed(v) => v.len(),
97        }
98    }
99}
100/// Destination for per-buffer encoded output produced by [`write_array_data`].
101enum IpcBodySink<'a> {
102    /// Serialize buffer bytes (with padding) into a contiguous byte vec.
103    Write(&'a mut Vec<u8>),
104    /// Accumulate pre-encoded buffer segments for deferred zero-copy streaming.
105    Collect(&'a mut Vec<EncodedBuffer>),
106}
107impl<'a> IpcBodySink<'a> {
108    /// Writes the encoded buffer to the sink.
109    pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
110        match self {
111            IpcBodySink::Write(vec) => {
112                vec.extend_from_slice(buffer.as_slice());
113                vec.extend_from_slice(&PADDING[..pad_len]);
114            }
115            IpcBodySink::Collect(vec) => {
116                vec.push(buffer);
117            }
118        }
119    }
120}
121
122/// Per-message sizes produced by [`IpcDataGenerator::write`].
123///
124/// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for
125/// random-access reads.
126struct IpcWriteMetadata {
127    /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written
128    /// before the record batch.
129    dictionary_block_sizes: Vec<(usize, usize)>,
130    /// Flatbuffer header size including continuation prefix and alignment padding.
131    padded_header_len: usize,
132    /// Total length of the record-batch body including trailing alignment padding.
133    body_len: usize,
134}
135
136impl IpcWriteOptions {
137    /// Configures compression when writing IPC files.
138    ///
139    /// Will result in a runtime error if the corresponding feature
140    /// is not enabled
141    pub fn try_with_compression(
142        mut self,
143        batch_compression_type: Option<crate::CompressionType>,
144    ) -> Result<Self, ArrowError> {
145        self.batch_compression_type = batch_compression_type;
146
147        if self.batch_compression_type.is_some()
148            && self.metadata_version < crate::MetadataVersion::V5
149        {
150            return Err(ArrowError::InvalidArgumentError(
151                "Compression only supported in metadata v5 and above".to_string(),
152            ));
153        }
154        Ok(self)
155    }
156    /// Try to create IpcWriteOptions, checking for incompatible settings
157    pub fn try_new(
158        alignment: usize,
159        write_legacy_ipc_format: bool,
160        metadata_version: crate::MetadataVersion,
161    ) -> Result<Self, ArrowError> {
162        let is_alignment_valid =
163            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
164        if !is_alignment_valid {
165            return Err(ArrowError::InvalidArgumentError(
166                "Alignment should be 8, 16, 32, or 64.".to_string(),
167            ));
168        }
169        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
170        match metadata_version {
171            crate::MetadataVersion::V1
172            | crate::MetadataVersion::V2
173            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
174                "Writing IPC metadata version 3 and lower not supported".to_string(),
175            )),
176            #[allow(deprecated)]
177            crate::MetadataVersion::V4 => Ok(Self {
178                alignment,
179                write_legacy_ipc_format,
180                metadata_version,
181                batch_compression_type: None,
182                dictionary_handling: DictionaryHandling::default(),
183            }),
184            crate::MetadataVersion::V5 => {
185                if write_legacy_ipc_format {
186                    Err(ArrowError::InvalidArgumentError(
187                        "Legacy IPC format only supported on metadata version 4".to_string(),
188                    ))
189                } else {
190                    Ok(Self {
191                        alignment,
192                        write_legacy_ipc_format,
193                        metadata_version,
194                        batch_compression_type: None,
195                        dictionary_handling: DictionaryHandling::default(),
196                    })
197                }
198            }
199            z => Err(ArrowError::InvalidArgumentError(format!(
200                "Unsupported crate::MetadataVersion {z:?}"
201            ))),
202        }
203    }
204
205    /// Configure how dictionaries are handled in IPC messages
206    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
207        self.dictionary_handling = dictionary_handling;
208        self
209    }
210}
211
212impl Default for IpcWriteOptions {
213    fn default() -> Self {
214        Self {
215            alignment: 64,
216            write_legacy_ipc_format: false,
217            metadata_version: crate::MetadataVersion::V5,
218            batch_compression_type: None,
219            dictionary_handling: DictionaryHandling::default(),
220        }
221    }
222}
223
224#[derive(Debug, Default)]
225/// Handles low level details of encoding [`Array`] and [`Schema`] into the
226/// [Arrow IPC Format].
227///
228/// # Example
229/// ```
230/// # fn run() {
231/// # use std::sync::Arc;
232/// # use arrow_array::UInt64Array;
233/// # use arrow_array::RecordBatch;
234/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
235///
236/// // Create a record batch
237/// let batch = RecordBatch::try_from_iter(vec![
238///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
239/// ]).unwrap();
240///
241/// // Error of dictionary ids are replaced.
242/// let error_on_replacement = true;
243/// let options = IpcWriteOptions::default();
244/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
245///
246/// let mut compression_context = CompressionContext::default();
247///
248/// // encode the batch into zero or more encoded dictionaries
249/// // and the data for the actual array.
250/// let data_gen = IpcDataGenerator::default();
251/// let (encoded_dictionaries, encoded_message) = data_gen
252///   .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context)
253///   .unwrap();
254/// # }
255/// ```
256///
257/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
258pub struct IpcDataGenerator {}
259
260impl IpcDataGenerator {
261    /// Converts a schema to an IPC message along with `dictionary_tracker`
262    /// and returns it encoded inside [EncodedData] as a flatbuffer.
263    pub fn schema_to_bytes_with_dictionary_tracker(
264        &self,
265        schema: &Schema,
266        dictionary_tracker: &mut DictionaryTracker,
267        write_options: &IpcWriteOptions,
268    ) -> EncodedData {
269        let mut fbb = FlatBufferBuilder::new();
270        let schema = {
271            let fb = IpcSchemaEncoder::new()
272                .with_dictionary_tracker(dictionary_tracker)
273                .schema_to_fb_offset(&mut fbb, schema);
274            fb.as_union_value()
275        };
276
277        let mut message = crate::MessageBuilder::new(&mut fbb);
278        message.add_version(write_options.metadata_version);
279        message.add_header_type(crate::MessageHeader::Schema);
280        message.add_bodyLength(0);
281        message.add_header(schema);
282        // TODO: custom metadata
283        let data = message.finish();
284        fbb.finish(data, None);
285
286        let data = fbb.finished_data();
287        EncodedData {
288            ipc_message: data.to_vec(),
289            arrow_data: vec![],
290        }
291    }
292
293    fn _encode_dictionaries<I: Iterator<Item = i64>>(
294        &self,
295        column: &ArrayRef,
296        encoded_dictionaries: &mut Vec<EncodedData>,
297        dictionary_tracker: &mut DictionaryTracker,
298        write_options: &IpcWriteOptions,
299        dict_id: &mut I,
300        compression_context: &mut CompressionContext,
301    ) -> Result<(), ArrowError> {
302        match column.data_type() {
303            DataType::Struct(fields) => {
304                let s = as_struct_array(column);
305                for (field, column) in fields.iter().zip(s.columns()) {
306                    self.encode_dictionaries(
307                        field,
308                        column,
309                        encoded_dictionaries,
310                        dictionary_tracker,
311                        write_options,
312                        dict_id,
313                        compression_context,
314                    )?;
315                }
316            }
317            DataType::RunEndEncoded(_, values) => {
318                let data = column.to_data();
319                if data.child_data().len() != 2 {
320                    return Err(ArrowError::InvalidArgumentError(format!(
321                        "The run encoded array should have exactly two child arrays. Found {}",
322                        data.child_data().len()
323                    )));
324                }
325                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
326                // only for values array.
327                let values_array = make_array(data.child_data()[1].clone());
328                self.encode_dictionaries(
329                    values,
330                    &values_array,
331                    encoded_dictionaries,
332                    dictionary_tracker,
333                    write_options,
334                    dict_id,
335                    compression_context,
336                )?;
337            }
338            DataType::List(field) => {
339                let list = as_list_array(column);
340                self.encode_dictionaries(
341                    field,
342                    list.values(),
343                    encoded_dictionaries,
344                    dictionary_tracker,
345                    write_options,
346                    dict_id,
347                    compression_context,
348                )?;
349            }
350            DataType::LargeList(field) => {
351                let list = as_large_list_array(column);
352                self.encode_dictionaries(
353                    field,
354                    list.values(),
355                    encoded_dictionaries,
356                    dictionary_tracker,
357                    write_options,
358                    dict_id,
359                    compression_context,
360                )?;
361            }
362            DataType::ListView(field) => {
363                let list = column.as_list_view::<i32>();
364                self.encode_dictionaries(
365                    field,
366                    list.values(),
367                    encoded_dictionaries,
368                    dictionary_tracker,
369                    write_options,
370                    dict_id,
371                    compression_context,
372                )?;
373            }
374            DataType::LargeListView(field) => {
375                let list = column.as_list_view::<i64>();
376                self.encode_dictionaries(
377                    field,
378                    list.values(),
379                    encoded_dictionaries,
380                    dictionary_tracker,
381                    write_options,
382                    dict_id,
383                    compression_context,
384                )?;
385            }
386            DataType::FixedSizeList(field, _) => {
387                let list = column
388                    .as_any()
389                    .downcast_ref::<FixedSizeListArray>()
390                    .expect("Unable to downcast to fixed size list array");
391                self.encode_dictionaries(
392                    field,
393                    list.values(),
394                    encoded_dictionaries,
395                    dictionary_tracker,
396                    write_options,
397                    dict_id,
398                    compression_context,
399                )?;
400            }
401            DataType::Map(field, _) => {
402                let map_array = as_map_array(column);
403
404                let (keys, values) = match field.data_type() {
405                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
406                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
407                };
408
409                // keys
410                self.encode_dictionaries(
411                    keys,
412                    map_array.keys(),
413                    encoded_dictionaries,
414                    dictionary_tracker,
415                    write_options,
416                    dict_id,
417                    compression_context,
418                )?;
419
420                // values
421                self.encode_dictionaries(
422                    values,
423                    map_array.values(),
424                    encoded_dictionaries,
425                    dictionary_tracker,
426                    write_options,
427                    dict_id,
428                    compression_context,
429                )?;
430            }
431            DataType::Union(fields, _) => {
432                let union = as_union_array(column);
433                for (type_id, field) in fields.iter() {
434                    let column = union.child(type_id);
435                    self.encode_dictionaries(
436                        field,
437                        column,
438                        encoded_dictionaries,
439                        dictionary_tracker,
440                        write_options,
441                        dict_id,
442                        compression_context,
443                    )?;
444                }
445            }
446            _ => (),
447        }
448
449        Ok(())
450    }
451
452    #[allow(clippy::too_many_arguments)]
453    fn encode_dictionaries<I: Iterator<Item = i64>>(
454        &self,
455        field: &Field,
456        column: &ArrayRef,
457        encoded_dictionaries: &mut Vec<EncodedData>,
458        dictionary_tracker: &mut DictionaryTracker,
459        write_options: &IpcWriteOptions,
460        dict_id_seq: &mut I,
461        compression_context: &mut CompressionContext,
462    ) -> Result<(), ArrowError> {
463        match column.data_type() {
464            DataType::Dictionary(_key_type, _value_type) => {
465                let dict_data = column.to_data();
466                let dict_values = &dict_data.child_data()[0];
467
468                let values = make_array(dict_data.child_data()[0].clone());
469
470                self._encode_dictionaries(
471                    &values,
472                    encoded_dictionaries,
473                    dictionary_tracker,
474                    write_options,
475                    dict_id_seq,
476                    compression_context,
477                )?;
478
479                // It's important to only take the dict_id at this point, because the dict ID
480                // sequence is assigned depth-first, so we need to first encode children and have
481                // them take their assigned dict IDs before we take the dict ID for this field.
482                let dict_id = dict_id_seq.next().ok_or_else(|| {
483                    ArrowError::IpcError(format!(
484                        "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
485                        field.name(),
486                        field.data_type(),
487                        column.data_type()
488                    ))
489                })?;
490
491                match dictionary_tracker.insert_column(
492                    dict_id,
493                    column,
494                    write_options.dictionary_handling,
495                )? {
496                    DictionaryUpdate::None => {}
497                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
498                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
499                            dict_id,
500                            dict_values,
501                            write_options,
502                            false,
503                            compression_context,
504                        )?);
505                    }
506                    DictionaryUpdate::Delta(data) => {
507                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
508                            dict_id,
509                            &data,
510                            write_options,
511                            true,
512                            compression_context,
513                        )?);
514                    }
515                }
516            }
517            _ => self._encode_dictionaries(
518                column,
519                encoded_dictionaries,
520                dictionary_tracker,
521                write_options,
522                dict_id_seq,
523                compression_context,
524            )?,
525        }
526
527        Ok(())
528    }
529
530    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
531    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
532    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
533    pub fn encode(
534        &self,
535        batch: &RecordBatch,
536        dictionary_tracker: &mut DictionaryTracker,
537        write_options: &IpcWriteOptions,
538        compression_context: &mut CompressionContext,
539    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
540        let encoded_dictionaries = self.encode_all_dicts(
541            batch,
542            dictionary_tracker,
543            write_options,
544            compression_context,
545        )?;
546        let mut arrow_data = Vec::new();
547        let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
548            batch,
549            write_options,
550            compression_context,
551            &mut IpcBodySink::Write(&mut arrow_data),
552        )?;
553        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
554        Ok((
555            encoded_dictionaries,
556            EncodedData {
557                ipc_message,
558                arrow_data,
559            },
560        ))
561    }
562
563    /// Encode dictionary batches for all columns in `batch`.
564    fn encode_all_dicts(
565        &self,
566        batch: &RecordBatch,
567        dictionary_tracker: &mut DictionaryTracker,
568        write_options: &IpcWriteOptions,
569        compression_context: &mut CompressionContext,
570    ) -> Result<Vec<EncodedData>, ArrowError> {
571        let schema = batch.schema();
572        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
573        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
574        for (i, field) in schema.fields().iter().enumerate() {
575            self.encode_dictionaries(
576                field,
577                batch.column(i),
578                &mut encoded_dictionaries,
579                dictionary_tracker,
580                write_options,
581                &mut dict_id,
582                compression_context,
583            )?;
584        }
585        Ok(encoded_dictionaries)
586    }
587
588    /// Write dictionary batches and the record batch directly to `writer`, skipping the
589    /// intermediate body `Vec<u8>` allocations
590    /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer blocks.
591    fn write<W: Write>(
592        &self,
593        batch: &RecordBatch,
594        dictionary_tracker: &mut DictionaryTracker,
595        write_options: &IpcWriteOptions,
596        compression_context: &mut CompressionContext,
597        writer: &mut W,
598    ) -> Result<IpcWriteMetadata, ArrowError> {
599        let encoded_dictionaries = self.encode_all_dicts(
600            batch,
601            dictionary_tracker,
602            write_options,
603            compression_context,
604        )?;
605
606        let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
607        for dict in encoded_dictionaries {
608            dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
609        }
610
611        let capacity = batch
612            .columns()
613            .iter()
614            .map(|a| estimate_encoded_buffer_count(a.data_type()))
615            .sum();
616        let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
617        let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
618            batch,
619            write_options,
620            compression_context,
621            &mut IpcBodySink::Collect(&mut encoded_buffers),
622        )?;
623
624        let alignment = write_options.alignment;
625        let a = usize::from(alignment - 1);
626        let prefix_size = if write_options.write_legacy_ipc_format {
627            4
628        } else {
629            8
630        };
631        let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
632        write_continuation(
633            &mut *writer,
634            write_options,
635            (aligned_size - prefix_size) as i32,
636        )?;
637        writer.write_all(&ipc_message)?;
638        writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
639        for enc in &encoded_buffers {
640            writer.write_all(enc.as_slice())?;
641            writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
642        }
643        writer.write_all(&PADDING[..tail_pad])?;
644
645        Ok(IpcWriteMetadata {
646            dictionary_block_sizes,
647            padded_header_len: aligned_size,
648            body_len,
649        })
650    }
651
652    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
653    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
654    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
655    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
656    pub fn encoded_batch(
657        &self,
658        batch: &RecordBatch,
659        dictionary_tracker: &mut DictionaryTracker,
660        write_options: &IpcWriteOptions,
661    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
662        self.encode(
663            batch,
664            dictionary_tracker,
665            write_options,
666            &mut Default::default(),
667        )
668    }
669
670    /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the
671    /// serialised buffer data.
672    ///
673    /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header bytes, the
674    /// total body length including trailing padding, and the trailing alignment padding byte count.
675    fn record_batch_to_bytes(
676        &self,
677        batch: &RecordBatch,
678        write_options: &IpcWriteOptions,
679        compression_context: &mut CompressionContext,
680        sink: &mut IpcBodySink<'_>,
681    ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
682        let mut fbb = FlatBufferBuilder::new();
683
684        let mut nodes: Vec<crate::FieldNode> = vec![];
685        let mut buffers: Vec<crate::Buffer> = vec![];
686
687        let batch_compression_type = write_options.batch_compression_type;
688
689        let compression = batch_compression_type.map(|batch_compression_type| {
690            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
691            c.add_method(crate::BodyCompressionMethod::BUFFER);
692            c.add_codec(batch_compression_type);
693            c.finish()
694        });
695
696        let compression_codec: Option<CompressionCodec> =
697            batch_compression_type.map(TryInto::try_into).transpose()?;
698
699        let alignment = write_options.alignment;
700        let mut variadic_buffer_counts = vec![];
701        let mut offset = 0i64;
702
703        for array in batch.columns() {
704            let array_data = array.to_data();
705            offset = write_array_data(
706                &array_data,
707                &mut buffers,
708                sink,
709                &mut nodes,
710                offset,
711                array.len(),
712                array.null_count(),
713                compression_codec,
714                compression_context,
715                write_options,
716            )?;
717            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
718        }
719
720        let tail_pad = pad_to_alignment(alignment, offset as usize);
721        let body_len = offset as usize + tail_pad;
722
723        let buffers = fbb.create_vector(&buffers);
724        let nodes = fbb.create_vector(&nodes);
725        let variadic_buffer = if variadic_buffer_counts.is_empty() {
726            None
727        } else {
728            Some(fbb.create_vector(&variadic_buffer_counts))
729        };
730
731        let root = {
732            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
733            batch_builder.add_length(batch.num_rows() as i64);
734            batch_builder.add_nodes(nodes);
735            batch_builder.add_buffers(buffers);
736            if let Some(c) = compression {
737                batch_builder.add_compression(c);
738            }
739            if let Some(v) = variadic_buffer {
740                batch_builder.add_variadicBufferCounts(v);
741            }
742            batch_builder.finish().as_union_value()
743        };
744        // create an crate::Message
745        let mut message = crate::MessageBuilder::new(&mut fbb);
746        message.add_version(write_options.metadata_version);
747        message.add_header_type(crate::MessageHeader::RecordBatch);
748        message.add_bodyLength(body_len as i64);
749        message.add_header(root);
750        let root = message.finish();
751        fbb.finish(root, None);
752
753        Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
754    }
755
756    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
757    /// other for the data
758    fn dictionary_batch_to_bytes(
759        &self,
760        dict_id: i64,
761        array_data: &ArrayData,
762        write_options: &IpcWriteOptions,
763        is_delta: bool,
764        compression_context: &mut CompressionContext,
765    ) -> Result<EncodedData, ArrowError> {
766        let mut fbb = FlatBufferBuilder::new();
767
768        let mut nodes: Vec<crate::FieldNode> = vec![];
769        let mut buffers: Vec<crate::Buffer> = vec![];
770        let mut arrow_data: Vec<u8> = vec![];
771
772        // get the type of compression
773        let batch_compression_type = write_options.batch_compression_type;
774
775        let compression = batch_compression_type.map(|batch_compression_type| {
776            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
777            c.add_method(crate::BodyCompressionMethod::BUFFER);
778            c.add_codec(batch_compression_type);
779            c.finish()
780        });
781
782        let compression_codec: Option<CompressionCodec> = batch_compression_type
783            .map(|batch_compression_type| batch_compression_type.try_into())
784            .transpose()?;
785
786        let alignment = write_options.alignment;
787        let mut sink = IpcBodySink::Write(&mut arrow_data);
788        let offset = write_array_data(
789            array_data,
790            &mut buffers,
791            &mut sink,
792            &mut nodes,
793            0,
794            array_data.len(),
795            array_data.null_count(),
796            compression_codec,
797            compression_context,
798            write_options,
799        )?;
800
801        let mut variadic_buffer_counts = vec![];
802        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
803
804        // pad the tail of body data
805        let tail_pad = pad_to_alignment(alignment, offset as usize);
806        let body_len = offset as usize + tail_pad;
807        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
808
809        // write data
810        let buffers = fbb.create_vector(&buffers);
811        let nodes = fbb.create_vector(&nodes);
812        let variadic_buffer = if variadic_buffer_counts.is_empty() {
813            None
814        } else {
815            Some(fbb.create_vector(&variadic_buffer_counts))
816        };
817
818        let root = {
819            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
820            batch_builder.add_length(array_data.len() as i64);
821            batch_builder.add_nodes(nodes);
822            batch_builder.add_buffers(buffers);
823            if let Some(c) = compression {
824                batch_builder.add_compression(c);
825            }
826            if let Some(v) = variadic_buffer {
827                batch_builder.add_variadicBufferCounts(v);
828            }
829            batch_builder.finish()
830        };
831
832        let root = {
833            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
834            batch_builder.add_id(dict_id);
835            batch_builder.add_data(root);
836            batch_builder.add_isDelta(is_delta);
837            batch_builder.finish().as_union_value()
838        };
839
840        let root = {
841            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
842            message_builder.add_version(write_options.metadata_version);
843            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
844            message_builder.add_bodyLength(body_len as i64);
845            message_builder.add_header(root);
846            message_builder.finish()
847        };
848
849        fbb.finish(root, None);
850        let finished_data = fbb.finished_data();
851
852        Ok(EncodedData {
853            ipc_message: finished_data.to_vec(),
854            arrow_data,
855        })
856    }
857}
858
859fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
860    match array.data_type() {
861        DataType::BinaryView | DataType::Utf8View => {
862            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
863            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
864            counts.push(array.buffers().len() as i64 - 1);
865        }
866        DataType::Dictionary(_, _) => {
867            // Do nothing
868            // Dictionary types are handled in `encode_dictionaries`.
869        }
870        _ => {
871            for child in array.child_data() {
872                append_variadic_buffer_counts(counts, child)
873            }
874        }
875    }
876}
877
878pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
879    match arr.data_type() {
880        DataType::RunEndEncoded(k, _) => match k.data_type() {
881            DataType::Int16 => {
882                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
883            }
884            DataType::Int32 => {
885                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
886            }
887            DataType::Int64 => {
888                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
889            }
890            d => unreachable!("Unexpected data type {d}"),
891        },
892        d => Err(ArrowError::InvalidArgumentError(format!(
893            "The given array is not a run array. Data type of given array: {d}"
894        ))),
895    }
896}
897
898// Returns a `RunArray` with zero offset and length matching the last value
899// in run_ends array.
900fn into_zero_offset_run_array<R: RunEndIndexType>(
901    run_array: RunArray<R>,
902) -> Result<RunArray<R>, ArrowError> {
903    let run_ends = run_array.run_ends();
904    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
905        return Ok(run_array);
906    }
907
908    // The physical index of original run_ends array from which the `ArrayData`is sliced.
909    let start_physical_index = run_ends.get_start_physical_index();
910
911    // The physical index of original run_ends array until which the `ArrayData`is sliced.
912    let end_physical_index = run_ends.get_end_physical_index();
913
914    let physical_length = end_physical_index - start_physical_index + 1;
915
916    // build new run_ends array by subtracting offset from run ends.
917    let offset = R::Native::usize_as(run_ends.offset());
918    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
919    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
920        builder.append(run_end_value.sub_wrapping(offset));
921    }
922    builder.append(R::Native::from_usize(run_array.len()).unwrap());
923    let new_run_ends = unsafe {
924        // Safety:
925        // The function builds a valid run_ends array and hence need not be validated.
926        ArrayDataBuilder::new(R::DATA_TYPE)
927            .len(physical_length)
928            .add_buffer(builder.finish())
929            .build_unchecked()
930    };
931
932    // build new values by slicing physical indices.
933    let new_values = run_array
934        .values()
935        .slice(start_physical_index, physical_length)
936        .into_data();
937
938    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
939        .len(run_array.len())
940        .add_child_data(new_run_ends)
941        .add_child_data(new_values);
942    let array_data = unsafe {
943        // Safety:
944        //  This function builds a valid run array and hence can skip validation.
945        builder.build_unchecked()
946    };
947    Ok(array_data.into())
948}
949
950/// Controls how dictionaries are handled in Arrow IPC messages
951#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
952pub enum DictionaryHandling {
953    /// Send the entire dictionary every time it is encountered (default)
954    #[default]
955    Resend,
956    /// Send only new dictionary values since the last batch (delta encoding)
957    ///
958    /// When a dictionary is first encountered, the entire dictionary is sent.
959    /// For subsequent batches, only values that are new (not previously sent)
960    /// are transmitted with the `isDelta` flag set to true.
961    Delta,
962}
963
964/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
965#[derive(Debug, Clone)]
966pub enum DictionaryUpdate {
967    /// No dictionary was written, the dictionary was identical to what was already
968    /// in the tracker.
969    None,
970    /// No dictionary was present in the tracker
971    New,
972    /// Dictionary was replaced with the new data
973    Replaced,
974    /// Dictionary was updated, ArrayData is the delta between old and new
975    Delta(ArrayData),
976}
977
978/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
979/// multiple times.
980///
981/// Can optionally error if an update to an existing dictionary is attempted, which
982/// isn't allowed in the `FileWriter`.
983#[derive(Debug)]
984pub struct DictionaryTracker {
985    // NOTE: When adding fields, update the clear() method accordingly.
986    written: HashMap<i64, ArrayData>,
987    dict_ids: Vec<i64>,
988    error_on_replacement: bool,
989}
990
991impl DictionaryTracker {
992    /// Create a new [`DictionaryTracker`].
993    ///
994    /// If `error_on_replacement`
995    /// is true, an error will be generated if an update to an
996    /// existing dictionary is attempted.
997    pub fn new(error_on_replacement: bool) -> Self {
998        #[allow(deprecated)]
999        Self {
1000            written: HashMap::new(),
1001            dict_ids: Vec::new(),
1002            error_on_replacement,
1003        }
1004    }
1005
1006    /// Record and return the next dictionary ID.
1007    pub fn next_dict_id(&mut self) -> i64 {
1008        let next = self
1009            .dict_ids
1010            .last()
1011            .copied()
1012            .map(|i| i + 1)
1013            .unwrap_or_default();
1014
1015        self.dict_ids.push(next);
1016        next
1017    }
1018
1019    /// Return the sequence of dictionary IDs in the order they should be observed while
1020    /// traversing the schema
1021    pub fn dict_id(&mut self) -> &[i64] {
1022        &self.dict_ids
1023    }
1024
1025    /// Keep track of the dictionary with the given ID and values. Behavior:
1026    ///
1027    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
1028    ///   that the dictionary was not actually inserted (because it's already been seen).
1029    /// * If this ID has been written already but with different data, and this tracker is
1030    ///   configured to return an error, return an error.
1031    /// * If the tracker has not been configured to error on replacement or this dictionary
1032    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
1033    ///   inserted.
1034    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1035    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1036        let dict_data = column.to_data();
1037        let dict_values = &dict_data.child_data()[0];
1038
1039        // If a dictionary with this id was already emitted, check if it was the same.
1040        if let Some(last) = self.written.get(&dict_id) {
1041            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1042                // Same dictionary values => no need to emit it again
1043                return Ok(false);
1044            }
1045            if self.error_on_replacement {
1046                // If error on replacement perform a logical comparison
1047                if last.child_data()[0] == *dict_values {
1048                    // Same dictionary values => no need to emit it again
1049                    return Ok(false);
1050                }
1051                return Err(ArrowError::InvalidArgumentError(
1052                    "Dictionary replacement detected when writing IPC file format. \
1053                     Arrow IPC files only support a single dictionary for a given field \
1054                     across all batches."
1055                        .to_string(),
1056                ));
1057            }
1058        }
1059
1060        self.written.insert(dict_id, dict_data);
1061        Ok(true)
1062    }
1063
1064    /// Keep track of the dictionary with the given ID and values. The return
1065    /// value indicates what, if any, update to the internal map took place
1066    /// and how it should be interpreted based on the `dict_handling` parameter.
1067    ///
1068    /// # Returns
1069    ///
1070    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
1071    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
1072    ///   with completely different data, or if the data is a delta of the existing,
1073    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
1074    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
1075    ///   the new data is a delta of the old and the `dict_handling` is set to
1076    ///   `DictionaryHandling::Delta`
1077    /// * `Err(e)` - If the dictionary was previously written with different data,
1078    ///   and `error_on_replacement` is set to `true`.
1079    pub fn insert_column(
1080        &mut self,
1081        dict_id: i64,
1082        column: &ArrayRef,
1083        dict_handling: DictionaryHandling,
1084    ) -> Result<DictionaryUpdate, ArrowError> {
1085        let new_data = column.to_data();
1086        let new_values = &new_data.child_data()[0];
1087
1088        // If there is no existing dictionary with this ID, we always insert
1089        let Some(old) = self.written.get(&dict_id) else {
1090            self.written.insert(dict_id, new_data);
1091            return Ok(DictionaryUpdate::New);
1092        };
1093
1094        // Fast path - If the array data points to the same buffer as the
1095        // existing then they're the same.
1096        let old_values = &old.child_data()[0];
1097        if ArrayData::ptr_eq(old_values, new_values) {
1098            return Ok(DictionaryUpdate::None);
1099        }
1100
1101        // Slow path - Compare the dictionaries value by value
1102        let comparison = compare_dictionaries(old_values, new_values);
1103        if matches!(comparison, DictionaryComparison::Equal) {
1104            return Ok(DictionaryUpdate::None);
1105        }
1106
1107        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1108                 Arrow IPC files only support a single dictionary for a given field \
1109                 across all batches.";
1110
1111        match comparison {
1112            DictionaryComparison::NotEqual => {
1113                if self.error_on_replacement {
1114                    return Err(ArrowError::InvalidArgumentError(
1115                        REPLACEMENT_ERROR.to_string(),
1116                    ));
1117                }
1118
1119                self.written.insert(dict_id, new_data);
1120                Ok(DictionaryUpdate::Replaced)
1121            }
1122            DictionaryComparison::Delta => match dict_handling {
1123                DictionaryHandling::Resend => {
1124                    if self.error_on_replacement {
1125                        return Err(ArrowError::InvalidArgumentError(
1126                            REPLACEMENT_ERROR.to_string(),
1127                        ));
1128                    }
1129
1130                    self.written.insert(dict_id, new_data);
1131                    Ok(DictionaryUpdate::Replaced)
1132                }
1133                DictionaryHandling::Delta => {
1134                    let delta =
1135                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
1136                    self.written.insert(dict_id, new_data);
1137                    Ok(DictionaryUpdate::Delta(delta))
1138                }
1139            },
1140            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1141        }
1142    }
1143
1144    /// Clears the state of the dictionary tracker.
1145    ///
1146    /// This allows the dictionary tracker to be reused for a new IPC stream while avoiding the
1147    /// allocation cost of creating a new instance. This method should not be called if
1148    /// the dictionary tracker will be used to continue writing to an existing IPC stream.
1149    pub fn clear(&mut self) {
1150        self.dict_ids.clear();
1151        self.written.clear();
1152    }
1153}
1154
1155/// Describes how two dictionary arrays compare to each other.
1156#[derive(Debug, Clone)]
1157enum DictionaryComparison {
1158    /// Neither a delta, nor an exact match
1159    NotEqual,
1160    /// Exact element-wise match
1161    Equal,
1162    /// The two arrays are dictionary deltas of each other, meaning the first
1163    /// is a prefix of the second.
1164    Delta,
1165}
1166
1167// Compares two dictionaries and returns a [`DictionaryComparison`].
1168fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1169    // Check for exact match
1170    let existing_len = old.len();
1171    let new_len = new.len();
1172    if existing_len == new_len {
1173        if *old == *new {
1174            return DictionaryComparison::Equal;
1175        } else {
1176            return DictionaryComparison::NotEqual;
1177        }
1178    }
1179
1180    // Can't be a delta if the new is shorter than the existing
1181    if new_len < existing_len {
1182        return DictionaryComparison::NotEqual;
1183    }
1184
1185    // Check for delta
1186    if new.slice(0, existing_len) == *old {
1187        return DictionaryComparison::Delta;
1188    }
1189
1190    DictionaryComparison::NotEqual
1191}
1192
1193/// Arrow File Writer
1194///
1195/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1196///
1197/// # See Also
1198///
1199/// * [`StreamWriter`] for writing IPC Streams
1200///
1201/// # Example
1202/// ```
1203/// # use arrow_array::record_batch;
1204/// # use arrow_ipc::writer::FileWriter;
1205/// # let mut file = vec![]; // mimic a file for the example
1206/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1207/// // create a new writer, the schema must be known in advance
1208/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1209/// // write each batch to the underlying writer
1210/// writer.write(&batch).unwrap();
1211/// // When all batches are written, call finish to flush all buffers
1212/// writer.finish().unwrap();
1213/// ```
1214/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1215pub struct FileWriter<W> {
1216    /// The object to write to
1217    writer: W,
1218    /// IPC write options
1219    write_options: IpcWriteOptions,
1220    /// A reference to the schema, used in validating record batches
1221    schema: SchemaRef,
1222    /// The number of bytes between each block of bytes, as an offset for random access
1223    block_offsets: usize,
1224    /// Dictionary blocks that will be written as part of the IPC footer
1225    dictionary_blocks: Vec<crate::Block>,
1226    /// Record blocks that will be written as part of the IPC footer
1227    record_blocks: Vec<crate::Block>,
1228    /// Whether the writer footer has been written, and the writer is finished
1229    finished: bool,
1230    /// Keeps track of dictionaries that have been written
1231    dictionary_tracker: DictionaryTracker,
1232    /// User level customized metadata
1233    custom_metadata: HashMap<String, String>,
1234
1235    data_gen: IpcDataGenerator,
1236
1237    compression_context: CompressionContext,
1238}
1239
1240impl<W: Write> FileWriter<BufWriter<W>> {
1241    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1242    ///
1243    /// See [`FileWriter::try_new`] for an unbuffered version.
1244    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1245        Self::try_new(BufWriter::new(writer), schema)
1246    }
1247}
1248
1249impl<W: Write> FileWriter<W> {
1250    /// Try to create a new writer, with the schema written as part of the header
1251    ///
1252    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1253    ///
1254    /// # Errors
1255    ///
1256    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1257    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1258        let write_options = IpcWriteOptions::default();
1259        Self::try_new_with_options(writer, schema, write_options)
1260    }
1261
1262    /// Try to create a new writer with IpcWriteOptions
1263    ///
1264    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1265    ///
1266    /// # Errors
1267    ///
1268    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1269    pub fn try_new_with_options(
1270        mut writer: W,
1271        schema: &Schema,
1272        write_options: IpcWriteOptions,
1273    ) -> Result<Self, ArrowError> {
1274        let data_gen = IpcDataGenerator::default();
1275        // write magic to header aligned on alignment boundary
1276        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1277        let header_size = super::ARROW_MAGIC.len() + pad_len;
1278        writer.write_all(&super::ARROW_MAGIC)?;
1279        writer.write_all(&PADDING[..pad_len])?;
1280        // write the schema, set the written bytes to the schema + header
1281        let mut dictionary_tracker = DictionaryTracker::new(true);
1282        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1283            schema,
1284            &mut dictionary_tracker,
1285            &write_options,
1286        );
1287        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1288        Ok(Self {
1289            writer,
1290            write_options,
1291            schema: Arc::new(schema.clone()),
1292            block_offsets: meta + data + header_size,
1293            dictionary_blocks: vec![],
1294            record_blocks: vec![],
1295            finished: false,
1296            dictionary_tracker,
1297            custom_metadata: HashMap::new(),
1298            data_gen,
1299            compression_context: CompressionContext::default(),
1300        })
1301    }
1302
1303    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1304    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1305        self.custom_metadata.insert(key.into(), value.into());
1306    }
1307
1308    /// Write a record batch to the file
1309    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1310        if self.finished {
1311            return Err(ArrowError::IpcError(
1312                "Cannot write record batch to file writer as it is closed".to_string(),
1313            ));
1314        }
1315
1316        let meta = self.data_gen.write(
1317            batch,
1318            &mut self.dictionary_tracker,
1319            &self.write_options,
1320            &mut self.compression_context,
1321            &mut self.writer,
1322        )?;
1323
1324        for (header_len, body_len) in meta.dictionary_block_sizes {
1325            let block = crate::Block::new(
1326                self.block_offsets as i64,
1327                header_len as i32,
1328                body_len as i64,
1329            );
1330            self.dictionary_blocks.push(block);
1331            self.block_offsets += header_len + body_len;
1332        }
1333
1334        // add a record block for the footer
1335        let block = crate::Block::new(
1336            self.block_offsets as i64,
1337            meta.padded_header_len as i32,
1338            meta.body_len as i64,
1339        );
1340        self.record_blocks.push(block);
1341        self.block_offsets += meta.padded_header_len + meta.body_len;
1342        Ok(())
1343    }
1344
1345    /// Write footer and closing tag, then mark the writer as done
1346    pub fn finish(&mut self) -> Result<(), ArrowError> {
1347        if self.finished {
1348            return Err(ArrowError::IpcError(
1349                "Cannot write footer to file writer as it is closed".to_string(),
1350            ));
1351        }
1352
1353        // write EOS
1354        write_continuation(&mut self.writer, &self.write_options, 0)?;
1355
1356        let mut fbb = FlatBufferBuilder::new();
1357        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1358        let record_batches = fbb.create_vector(&self.record_blocks);
1359
1360        // dictionaries are already written, so we can reset dictionary tracker to reuse for schema
1361        self.dictionary_tracker.clear();
1362        let schema = IpcSchemaEncoder::new()
1363            .with_dictionary_tracker(&mut self.dictionary_tracker)
1364            .schema_to_fb_offset(&mut fbb, &self.schema);
1365        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1366            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1367
1368        let root = {
1369            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1370            footer_builder.add_version(self.write_options.metadata_version);
1371            footer_builder.add_schema(schema);
1372            footer_builder.add_dictionaries(dictionaries);
1373            footer_builder.add_recordBatches(record_batches);
1374            if let Some(fb_custom_metadata) = fb_custom_metadata {
1375                footer_builder.add_custom_metadata(fb_custom_metadata);
1376            }
1377            footer_builder.finish()
1378        };
1379        fbb.finish(root, None);
1380        let footer_data = fbb.finished_data();
1381        self.writer.write_all(footer_data)?;
1382        self.writer
1383            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1384        self.writer.write_all(&super::ARROW_MAGIC)?;
1385        self.writer.flush()?;
1386        self.finished = true;
1387
1388        Ok(())
1389    }
1390
1391    /// Returns the arrow [`SchemaRef`] for this arrow file.
1392    pub fn schema(&self) -> &SchemaRef {
1393        &self.schema
1394    }
1395
1396    /// Gets a reference to the underlying writer.
1397    pub fn get_ref(&self) -> &W {
1398        &self.writer
1399    }
1400
1401    /// Gets a mutable reference to the underlying writer.
1402    ///
1403    /// It is inadvisable to directly write to the underlying writer.
1404    pub fn get_mut(&mut self) -> &mut W {
1405        &mut self.writer
1406    }
1407
1408    /// Flush the underlying writer.
1409    ///
1410    /// Both the BufWriter and the underlying writer are flushed.
1411    pub fn flush(&mut self) -> Result<(), ArrowError> {
1412        self.writer.flush()?;
1413        Ok(())
1414    }
1415
1416    /// Unwraps the underlying writer.
1417    ///
1418    /// The writer is flushed and the FileWriter is finished before returning.
1419    ///
1420    /// # Errors
1421    ///
1422    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1423    /// or while flushing the writer.
1424    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1425        if !self.finished {
1426            // `finish` flushes the writer.
1427            self.finish()?;
1428        }
1429        Ok(self.writer)
1430    }
1431}
1432
1433impl<W: Write> RecordBatchWriter for FileWriter<W> {
1434    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1435        self.write(batch)
1436    }
1437
1438    fn close(mut self) -> Result<(), ArrowError> {
1439        self.finish()
1440    }
1441}
1442
1443/// Arrow Stream Writer
1444///
1445/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1446///
1447/// # See Also
1448///
1449/// * [`FileWriter`] for writing IPC Files
1450///
1451/// # Example - Basic usage
1452/// ```
1453/// # use arrow_array::record_batch;
1454/// # use arrow_ipc::writer::StreamWriter;
1455/// # let mut stream = vec![]; // mimic a stream for the example
1456/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1457/// // create a new writer, the schema must be known in advance
1458/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1459/// // write each batch to the underlying stream
1460/// writer.write(&batch).unwrap();
1461/// // When all batches are written, call finish to flush all buffers
1462/// writer.finish().unwrap();
1463/// ```
1464/// # Example - Efficient delta dictionaries
1465/// ```
1466/// # use arrow_array::record_batch;
1467/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1468/// # use arrow_ipc::writer::DictionaryHandling;
1469/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1470/// # use arrow_array::{
1471/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1472/// #    RecordBatch, StringArray,
1473/// # };
1474/// # use std::sync::Arc;
1475///
1476/// let schema = Arc::new(Schema::new(vec![Field::new(
1477///    "col1",
1478///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1479///    true,
1480/// )]));
1481///
1482/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1483///
1484/// // `finish_preserve_values` will keep the dictionary values along with their
1485/// // key assignments so that they can be re-used in the next batch.
1486/// builder.append("a").unwrap();
1487/// builder.append("b").unwrap();
1488/// let array1 = builder.finish_preserve_values();
1489/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1490///
1491/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1492/// // and 'd' will take the next available key.
1493/// builder.append("a").unwrap();
1494/// builder.append("d").unwrap();
1495/// let array2 = builder.finish_preserve_values();
1496/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1497///
1498/// let mut stream = vec![];
1499/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1500/// // enable delta dictionaries in the writer
1501/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1502/// let mut writer = StreamWriter::try_new_with_options(&mut stream, &schema, options).unwrap();
1503///
1504/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1505/// // prior to the record batch.
1506/// writer.write(&batch1).unwrap();
1507/// // With the second batch only a delta dictionary with 'd' will be written
1508/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1509/// // Without it, 'a' and 'd' in this batch would have different keys than the
1510/// // first batch and so we'd have to send a replacement dictionary with new keys
1511/// // for both.
1512/// writer.write(&batch2).unwrap();
1513/// writer.finish().unwrap();
1514/// ```
1515/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1516pub struct StreamWriter<W> {
1517    /// The object to write to
1518    writer: W,
1519    /// IPC write options
1520    write_options: IpcWriteOptions,
1521    /// Whether the writer footer has been written, and the writer is finished
1522    finished: bool,
1523    /// Keeps track of dictionaries that have been written
1524    dictionary_tracker: DictionaryTracker,
1525
1526    data_gen: IpcDataGenerator,
1527
1528    compression_context: CompressionContext,
1529}
1530
1531impl<W: Write> StreamWriter<BufWriter<W>> {
1532    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1533    ///
1534    /// See [`StreamWriter::try_new`] for an unbuffered version.
1535    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1536        Self::try_new(BufWriter::new(writer), schema)
1537    }
1538}
1539
1540impl<W: Write> StreamWriter<W> {
1541    /// Try to create a new writer, with the schema written as part of the header.
1542    ///
1543    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1544    ///
1545    /// # Errors
1546    ///
1547    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1548    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1549        let write_options = IpcWriteOptions::default();
1550        Self::try_new_with_options(writer, schema, write_options)
1551    }
1552
1553    /// Try to create a new writer with [`IpcWriteOptions`].
1554    ///
1555    /// # Errors
1556    ///
1557    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1558    pub fn try_new_with_options(
1559        mut writer: W,
1560        schema: &Schema,
1561        write_options: IpcWriteOptions,
1562    ) -> Result<Self, ArrowError> {
1563        let data_gen = IpcDataGenerator::default();
1564        let mut dictionary_tracker = DictionaryTracker::new(false);
1565
1566        // write the schema, set the written bytes to the schema
1567        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1568            schema,
1569            &mut dictionary_tracker,
1570            &write_options,
1571        );
1572        write_message(&mut writer, encoded_message, &write_options)?;
1573        Ok(Self {
1574            writer,
1575            write_options,
1576            finished: false,
1577            dictionary_tracker,
1578            data_gen,
1579            compression_context: CompressionContext::default(),
1580        })
1581    }
1582
1583    /// Write a record batch to the stream
1584    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1585        if self.finished {
1586            return Err(ArrowError::IpcError(
1587                "Cannot write record batch to stream writer as it is closed".to_string(),
1588            ));
1589        }
1590
1591        self.data_gen.write(
1592            batch,
1593            &mut self.dictionary_tracker,
1594            &self.write_options,
1595            &mut self.compression_context,
1596            &mut self.writer,
1597        )?;
1598        Ok(())
1599    }
1600
1601    /// Write continuation bytes, and mark the stream as done
1602    pub fn finish(&mut self) -> Result<(), ArrowError> {
1603        if self.finished {
1604            return Err(ArrowError::IpcError(
1605                "Cannot write footer to stream writer as it is closed".to_string(),
1606            ));
1607        }
1608
1609        write_continuation(&mut self.writer, &self.write_options, 0)?;
1610        self.writer.flush()?;
1611
1612        self.finished = true;
1613
1614        Ok(())
1615    }
1616
1617    /// Gets a reference to the underlying writer.
1618    pub fn get_ref(&self) -> &W {
1619        &self.writer
1620    }
1621
1622    /// Gets a mutable reference to the underlying writer.
1623    ///
1624    /// It is inadvisable to directly write to the underlying writer.
1625    pub fn get_mut(&mut self) -> &mut W {
1626        &mut self.writer
1627    }
1628
1629    /// Flush the underlying writer.
1630    ///
1631    /// Both the BufWriter and the underlying writer are flushed.
1632    pub fn flush(&mut self) -> Result<(), ArrowError> {
1633        self.writer.flush()?;
1634        Ok(())
1635    }
1636
1637    /// Unwraps the the underlying writer.
1638    ///
1639    /// The writer is flushed and the StreamWriter is finished before returning.
1640    ///
1641    /// # Errors
1642    ///
1643    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1644    /// or while flushing the writer.
1645    ///
1646    /// # Example
1647    ///
1648    /// ```
1649    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1650    /// # use arrow_ipc::MetadataVersion;
1651    /// # use arrow_schema::{ArrowError, Schema};
1652    /// # fn main() -> Result<(), ArrowError> {
1653    /// // The result we expect from an empty schema
1654    /// let expected = vec![
1655    ///     255, 255, 255, 255,  48,   0,   0,   0,
1656    ///      16,   0,   0,   0,   0,   0,  10,   0,
1657    ///      12,   0,  10,   0,   9,   0,   4,   0,
1658    ///      10,   0,   0,   0,  16,   0,   0,   0,
1659    ///       0,   1,   4,   0,   8,   0,   8,   0,
1660    ///       0,   0,   4,   0,   8,   0,   0,   0,
1661    ///       4,   0,   0,   0,   0,   0,   0,   0,
1662    ///     255, 255, 255, 255,   0,   0,   0,   0
1663    /// ];
1664    ///
1665    /// let schema = Schema::empty();
1666    /// let buffer: Vec<u8> = Vec::new();
1667    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1668    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1669    ///
1670    /// assert_eq!(stream_writer.into_inner()?, expected);
1671    /// # Ok(())
1672    /// # }
1673    /// ```
1674    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1675        if !self.finished {
1676            // `finish` flushes.
1677            self.finish()?;
1678        }
1679        Ok(self.writer)
1680    }
1681}
1682
1683impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1684    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1685        self.write(batch)
1686    }
1687
1688    fn close(mut self) -> Result<(), ArrowError> {
1689        self.finish()
1690    }
1691}
1692
1693/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1694pub struct EncodedData {
1695    /// An encoded crate::Message
1696    pub ipc_message: Vec<u8>,
1697    /// Arrow buffers to be written, should be an empty vec for schema messages
1698    pub arrow_data: Vec<u8>,
1699}
1700/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1701pub fn write_message<W: Write>(
1702    mut writer: W,
1703    encoded: EncodedData,
1704    write_options: &IpcWriteOptions,
1705) -> Result<(usize, usize), ArrowError> {
1706    let arrow_data_len = encoded.arrow_data.len();
1707    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1708        return Err(ArrowError::MemoryError(
1709            "Arrow data not aligned".to_string(),
1710        ));
1711    }
1712
1713    let a = usize::from(write_options.alignment - 1);
1714    let buffer = encoded.ipc_message;
1715    let flatbuf_size = buffer.len();
1716    let prefix_size = if write_options.write_legacy_ipc_format {
1717        4
1718    } else {
1719        8
1720    };
1721    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1722    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1723
1724    write_continuation(
1725        &mut writer,
1726        write_options,
1727        (aligned_size - prefix_size) as i32,
1728    )?;
1729
1730    // write the flatbuf
1731    if flatbuf_size > 0 {
1732        writer.write_all(&buffer)?;
1733    }
1734    // write padding
1735    writer.write_all(&PADDING[..padding_bytes])?;
1736
1737    // write arrow data
1738    let body_len = if arrow_data_len > 0 {
1739        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1740    } else {
1741        0
1742    };
1743
1744    Ok((aligned_size, body_len))
1745}
1746
1747fn write_body_buffers<W: Write>(
1748    mut writer: W,
1749    data: &[u8],
1750    alignment: u8,
1751) -> Result<usize, ArrowError> {
1752    let len = data.len();
1753    let pad_len = pad_to_alignment(alignment, len);
1754    let total_len = len + pad_len;
1755
1756    // write body buffer
1757    writer.write_all(data)?;
1758    if pad_len > 0 {
1759        writer.write_all(&PADDING[..pad_len])?;
1760    }
1761
1762    Ok(total_len)
1763}
1764
1765/// Write a record batch to the writer, writing the message size before the message
1766/// if the record batch is being written to a stream
1767fn write_continuation<W: Write>(
1768    mut writer: W,
1769    write_options: &IpcWriteOptions,
1770    total_len: i32,
1771) -> Result<usize, ArrowError> {
1772    let mut written = 8;
1773
1774    // the version of the writer determines whether continuation markers should be added
1775    match write_options.metadata_version {
1776        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1777            unreachable!("Options with the metadata version cannot be created")
1778        }
1779        crate::MetadataVersion::V4 => {
1780            if !write_options.write_legacy_ipc_format {
1781                // v0.15.0 format
1782                writer.write_all(&CONTINUATION_MARKER)?;
1783                written = 4;
1784            }
1785            writer.write_all(&total_len.to_le_bytes()[..])?;
1786        }
1787        crate::MetadataVersion::V5 => {
1788            // write continuation marker and message length
1789            writer.write_all(&CONTINUATION_MARKER)?;
1790            writer.write_all(&total_len.to_le_bytes()[..])?;
1791        }
1792        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1793    };
1794
1795    Ok(written)
1796}
1797
1798/// In V4, null types have no validity bitmap
1799/// In V5 and later, null and union types have no validity bitmap
1800/// Run end encoded type has no validity bitmap.
1801fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1802    if write_options.metadata_version < crate::MetadataVersion::V5 {
1803        !matches!(data_type, DataType::Null)
1804    } else {
1805        !matches!(
1806            data_type,
1807            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1808        )
1809    }
1810}
1811
1812/// Whether to truncate the buffer
1813#[inline]
1814fn buffer_need_truncate(
1815    array_offset: usize,
1816    buffer: &Buffer,
1817    spec: &BufferSpec,
1818    min_length: usize,
1819) -> bool {
1820    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1821}
1822
1823/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1824#[inline]
1825fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1826    match spec {
1827        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1828        _ => 0,
1829    }
1830}
1831
1832/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1833/// original start offset and length for use in slicing child data.
1834fn reencode_offsets<O: OffsetSizeTrait>(
1835    offsets: &Buffer,
1836    data: &ArrayData,
1837) -> (Buffer, usize, usize) {
1838    let offsets_slice: &[O] = offsets.typed_data::<O>();
1839    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1840
1841    let start_offset = offset_slice.first().unwrap();
1842    let end_offset = offset_slice.last().unwrap();
1843
1844    let offsets = match start_offset.as_usize() {
1845        0 => {
1846            let size = size_of::<O>();
1847            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1848        }
1849        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1850    };
1851
1852    let start_offset = start_offset.as_usize();
1853    let end_offset = end_offset.as_usize();
1854
1855    (offsets, start_offset, end_offset - start_offset)
1856}
1857
1858/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1859///
1860/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1861/// slicing the values buffer as appropriate. This helps reduce the encoded
1862/// size of sliced arrays, as values that have been sliced away are not encoded
1863fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1864    if data.is_empty() {
1865        // As per specification, offsets buffer has N+1 elements.
1866        // So an empty array should still be encoded with a single 0 offset.
1867        let mut offsets = MutableBuffer::new(size_of::<O>());
1868        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1869        return (offsets.into(), MutableBuffer::new(0).into());
1870    }
1871
1872    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1873    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1874    (offsets, values)
1875}
1876
1877/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1878/// of a values buffer.
1879fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1880    if data.is_empty() {
1881        // As per specification, offsets buffer has N+1 elements.
1882        // So an empty array should still be encoded with a single 0 offset.
1883        let mut offsets = MutableBuffer::new(size_of::<O>());
1884        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1885        return (offsets.into(), data.child_data()[0].slice(0, 0));
1886    }
1887
1888    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1889    let child_data = data.child_data()[0].slice(original_start_offset, len);
1890    (offsets, child_data)
1891}
1892
1893/// Returns the offsets, sizes, and child data buffers for a ListView array.
1894///
1895/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
1896/// and offsets can be non-monotonic. When slicing, we simply pass through the
1897/// offsets and sizes without re-encoding, and do not slice the child data.
1898fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1899    data: &ArrayData,
1900) -> (Buffer, Buffer, ArrayData) {
1901    if data.is_empty() {
1902        return (
1903            MutableBuffer::new(0).into(),
1904            MutableBuffer::new(0).into(),
1905            data.child_data()[0].slice(0, 0),
1906        );
1907    }
1908
1909    let offsets = &data.buffers()[0];
1910    let sizes = &data.buffers()[1];
1911
1912    let element_size = std::mem::size_of::<O>();
1913    let offsets_slice =
1914        offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1915    let sizes_slice =
1916        sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1917
1918    let child_data = data.child_data()[0].clone();
1919
1920    (offsets_slice, sizes_slice, child_data)
1921}
1922
1923/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
1924///
1925/// The views buffer is sliced to only include views in the valid range based on
1926/// the array's offset and length. This helps reduce the encoded size of sliced
1927/// arrays
1928///
1929fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
1930    let buffer = &array_data.buffers()[0];
1931    let layout = layout(array_data.data_type());
1932    let spec = &layout.buffers[0];
1933
1934    let byte_width = get_buffer_element_width(spec);
1935    let min_length = array_data.len() * byte_width;
1936    if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1937        let byte_offset = array_data.offset() * byte_width;
1938        let buffer_length = min(min_length, buffer.len() - byte_offset);
1939        buffer.slice_with_length(byte_offset, buffer_length)
1940    } else {
1941        buffer.clone()
1942    }
1943}
1944
1945/// Recursively encodes `array_data` into its IPC representation.
1946///
1947/// Output goes to two separate channels:
1948/// - `buffers` / `nodes`: IPC metadata (offsets, lengths, null counts) that will be
1949///   serialised into the flatbuffer `RecordBatch` header.
1950/// - `sink`: the raw Arrow data bytes that form the IPC message body.
1951#[allow(clippy::too_many_arguments)]
1952fn write_array_data(
1953    array_data: &ArrayData,
1954    buffers: &mut Vec<crate::Buffer>,
1955    sink: &mut IpcBodySink<'_>,
1956    nodes: &mut Vec<crate::FieldNode>,
1957    offset: i64,
1958    num_rows: usize,
1959    null_count: usize,
1960    compression_codec: Option<CompressionCodec>,
1961    compression_context: &mut CompressionContext,
1962    write_options: &IpcWriteOptions,
1963) -> Result<i64, ArrowError> {
1964    let mut offset = offset;
1965    if !matches!(array_data.data_type(), DataType::Null) {
1966        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1967    } else {
1968        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1969        // where null_count is always 0.
1970        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1971    }
1972    if has_validity_bitmap(array_data.data_type(), write_options) {
1973        // write null buffer if exists
1974        let null_buffer = match array_data.nulls() {
1975            None => {
1976                // create a buffer and fill it with valid bits
1977                let num_bytes = bit_util::ceil(num_rows, 8);
1978                let buffer = MutableBuffer::new(num_bytes);
1979                let buffer = buffer.with_bitset(num_bytes, true);
1980                buffer.into()
1981            }
1982            Some(buffer) => buffer.inner().sliced(),
1983        };
1984
1985        offset = encode_sink_buffer(
1986            null_buffer,
1987            buffers,
1988            sink,
1989            offset,
1990            compression_codec,
1991            compression_context,
1992            write_options.alignment,
1993        )?;
1994    }
1995
1996    let data_type = array_data.data_type();
1997    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1998        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1999        for buffer in [offsets, values] {
2000            offset = encode_sink_buffer(
2001                buffer,
2002                buffers,
2003                sink,
2004                offset,
2005                compression_codec,
2006                compression_context,
2007                write_options.alignment,
2008            )?;
2009        }
2010    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2011        // Slicing the views buffer is safe and easy,
2012        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
2013        //
2014        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
2015        // If users wants to "compact" the arrays prior to sending them over IPC,
2016        // they should consider the gc API suggested in #5513
2017        let views = get_or_truncate_buffer(array_data);
2018        offset = encode_sink_buffer(
2019            views,
2020            buffers,
2021            sink,
2022            offset,
2023            compression_codec,
2024            compression_context,
2025            write_options.alignment,
2026        )?;
2027
2028        for buffer in array_data.buffers().iter().skip(1) {
2029            offset = encode_sink_buffer(
2030                buffer.clone(),
2031                buffers,
2032                sink,
2033                offset,
2034                compression_codec,
2035                compression_context,
2036                write_options.alignment,
2037            )?;
2038        }
2039    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2040        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2041        for buffer in [offsets, values] {
2042            offset = encode_sink_buffer(
2043                buffer,
2044                buffers,
2045                sink,
2046                offset,
2047                compression_codec,
2048                compression_context,
2049                write_options.alignment,
2050            )?;
2051        }
2052    } else if DataType::is_numeric(data_type)
2053        || DataType::is_temporal(data_type)
2054        || matches!(
2055            array_data.data_type(),
2056            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2057        )
2058    {
2059        // Truncate values
2060        assert_eq!(array_data.buffers().len(), 1);
2061
2062        let buffer = get_or_truncate_buffer(array_data);
2063        offset = encode_sink_buffer(
2064            buffer,
2065            buffers,
2066            sink,
2067            offset,
2068            compression_codec,
2069            compression_context,
2070            write_options.alignment,
2071        )?;
2072    } else if matches!(data_type, DataType::Boolean) {
2073        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
2074        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
2075        assert_eq!(array_data.buffers().len(), 1);
2076
2077        let buffer = &array_data.buffers()[0];
2078        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2079        offset = encode_sink_buffer(
2080            buffer,
2081            buffers,
2082            sink,
2083            offset,
2084            compression_codec,
2085            compression_context,
2086            write_options.alignment,
2087        )?;
2088    } else if matches!(
2089        data_type,
2090        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2091    ) {
2092        assert_eq!(array_data.buffers().len(), 1);
2093        assert_eq!(array_data.child_data().len(), 1);
2094
2095        // Truncate offsets and the child data to avoid writing unnecessary data
2096        let (offsets, sliced_child_data) = match data_type {
2097            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2098            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2099            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2100            _ => unreachable!(),
2101        };
2102        offset = encode_sink_buffer(
2103            offsets,
2104            buffers,
2105            sink,
2106            offset,
2107            compression_codec,
2108            compression_context,
2109            write_options.alignment,
2110        )?;
2111        offset = write_array_data(
2112            &sliced_child_data,
2113            buffers,
2114            sink,
2115            nodes,
2116            offset,
2117            sliced_child_data.len(),
2118            sliced_child_data.null_count(),
2119            compression_codec,
2120            compression_context,
2121            write_options,
2122        )?;
2123        return Ok(offset);
2124    } else if matches!(
2125        data_type,
2126        DataType::ListView(_) | DataType::LargeListView(_)
2127    ) {
2128        assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
2129        assert_eq!(array_data.child_data().len(), 1);
2130
2131        let (offsets, sizes, child_data) = match data_type {
2132            DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2133            DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2134            _ => unreachable!(),
2135        };
2136
2137        offset = encode_sink_buffer(
2138            offsets,
2139            buffers,
2140            sink,
2141            offset,
2142            compression_codec,
2143            compression_context,
2144            write_options.alignment,
2145        )?;
2146        offset = encode_sink_buffer(
2147            sizes,
2148            buffers,
2149            sink,
2150            offset,
2151            compression_codec,
2152            compression_context,
2153            write_options.alignment,
2154        )?;
2155
2156        offset = write_array_data(
2157            &child_data,
2158            buffers,
2159            sink,
2160            nodes,
2161            offset,
2162            child_data.len(),
2163            child_data.null_count(),
2164            compression_codec,
2165            compression_context,
2166            write_options,
2167        )?;
2168        return Ok(offset);
2169    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2170        assert_eq!(array_data.child_data().len(), 1);
2171        let fixed_size = *fixed_size as usize;
2172
2173        let child_offset = array_data.offset() * fixed_size;
2174        let child_length = array_data.len() * fixed_size;
2175        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2176
2177        offset = write_array_data(
2178            &child_data,
2179            buffers,
2180            sink,
2181            nodes,
2182            offset,
2183            child_data.len(),
2184            child_data.null_count(),
2185            compression_codec,
2186            compression_context,
2187            write_options,
2188        )?;
2189        return Ok(offset);
2190    } else {
2191        for buffer in array_data.buffers() {
2192            offset = encode_sink_buffer(
2193                buffer.clone(),
2194                buffers,
2195                sink,
2196                offset,
2197                compression_codec,
2198                compression_context,
2199                write_options.alignment,
2200            )?;
2201        }
2202    }
2203
2204    match array_data.data_type() {
2205        DataType::Dictionary(_, _) => {}
2206        DataType::RunEndEncoded(_, _) => {
2207            // unslice the run encoded array.
2208            let arr = unslice_run_array(array_data.clone())?;
2209            // recursively write out nested structures
2210            for data_ref in arr.child_data() {
2211                // write the nested data (e.g list data)
2212                offset = write_array_data(
2213                    data_ref,
2214                    buffers,
2215                    sink,
2216                    nodes,
2217                    offset,
2218                    data_ref.len(),
2219                    data_ref.null_count(),
2220                    compression_codec,
2221                    compression_context,
2222                    write_options,
2223                )?;
2224            }
2225        }
2226        _ => {
2227            // recursively write out nested structures
2228            for data_ref in array_data.child_data() {
2229                // write the nested data (e.g list data)
2230                offset = write_array_data(
2231                    data_ref,
2232                    buffers,
2233                    sink,
2234                    nodes,
2235                    offset,
2236                    data_ref.len(),
2237                    data_ref.null_count(),
2238                    compression_codec,
2239                    compression_context,
2240                    write_options,
2241                )?;
2242            }
2243        }
2244    }
2245    Ok(offset)
2246}
2247
2248/// Encodes a single Arrow [`Buffer`] into the IPC body and records its metadata.
2249///
2250/// - `buffer`: the Arrow data buffer to encode (validity bitmap, offsets, values, etc.)
2251/// - `buffers`: in-progress list of IPC `Buffer` metadata entries (body offset + length) that
2252///   will eventually be serialised into the flatbuffer `RecordBatch` header.
2253/// - `sink`: destination for the actual encoded bytes; either a contiguous `Vec<u8>` for
2254///   in-memory writes, or a list of [`EncodedBuffer`] segments for deferred zero-copy streaming.
2255/// - `offset`: running byte offset into the IPC message body, used to compute the metadata entry.
2256/// - `compression_codec` / `compression_context`: if `Some`, the buffer is compressed before
2257///   writing; `compression_context` provides reusable scratch space across calls.
2258/// - `alignment`: each buffer is padded to this many bytes so the next buffer starts aligned.
2259///
2260/// Returns the updated `offset` (advanced by the encoded length plus any alignment padding).
2261fn encode_sink_buffer(
2262    buffer: Buffer,
2263    buffers: &mut Vec<crate::Buffer>,
2264    sink: &mut IpcBodySink<'_>,
2265    offset: i64,
2266    compression_codec: Option<CompressionCodec>,
2267    compression_context: &mut CompressionContext,
2268    alignment: u8,
2269) -> Result<i64, ArrowError> {
2270    let (encoded, len) = match compression_codec {
2271        None => {
2272            let len = buffer.len() as i64;
2273            (EncodedBuffer::Raw(buffer), len)
2274        }
2275        Some(codec) => {
2276            let mut scratch = Vec::new();
2277            let written =
2278                codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?;
2279            let len = i64::try_from(written)
2280                .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2281            (EncodedBuffer::Compressed(scratch), len)
2282        }
2283    };
2284
2285    let pad_len = pad_to_alignment(alignment, len as usize);
2286    sink.write(pad_len, encoded);
2287    buffers.push(crate::Buffer::new(offset, len));
2288    Ok(offset + len + pad_len as i64)
2289}
2290
2291const PADDING: [u8; 64] = [0; 64];
2292
2293/// Estimates the number of [`EncodedBuffer`] segments that [`write_array_data`]
2294/// will produce for a column of the given type.
2295///
2296/// Based on the Arrow IPC buffer layout
2297/// (<https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message>):
2298#[inline]
2299fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2300    match dt {
2301        DataType::Null => 0,
2302
2303        DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2304
2305        DataType::BinaryView | DataType::Utf8View => 3,
2306
2307        DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2308            2 + estimate_encoded_buffer_count(f.data_type())
2309        }
2310
2311        DataType::ListView(f) | DataType::LargeListView(f) => {
2312            3 + estimate_encoded_buffer_count(f.data_type())
2313        }
2314
2315        DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2316
2317        DataType::Struct(fields) => {
2318            1 + fields
2319                .iter()
2320                .map(|f| estimate_encoded_buffer_count(f.data_type()))
2321                .sum::<usize>()
2322        }
2323
2324        // Dictionary indices only; dictionary body is a separate IPC message.
2325        DataType::Dictionary(_, _) => 2,
2326
2327        DataType::Union(fields, UnionMode::Sparse) => {
2328            1 + fields
2329                .iter()
2330                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2331                .sum::<usize>()
2332        }
2333        DataType::Union(fields, UnionMode::Dense) => {
2334            2 + fields
2335                .iter()
2336                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2337                .sum::<usize>()
2338        }
2339
2340        DataType::RunEndEncoded(run_ends, values) => {
2341            estimate_encoded_buffer_count(run_ends.data_type())
2342                + estimate_encoded_buffer_count(values.data_type())
2343        }
2344        // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity + values.
2345        _ => 2,
2346    }
2347}
2348
2349/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2350#[inline]
2351fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2352    let a = usize::from(alignment - 1);
2353    ((len + a) & !a) - len
2354}
2355
2356#[cfg(test)]
2357mod tests {
2358    use std::hash::Hasher;
2359    use std::io::Cursor;
2360    use std::io::Seek;
2361
2362    use arrow_array::builder::FixedSizeListBuilder;
2363    use arrow_array::builder::Float32Builder;
2364    use arrow_array::builder::Int64Builder;
2365    use arrow_array::builder::MapBuilder;
2366    use arrow_array::builder::StringViewBuilder;
2367    use arrow_array::builder::UnionBuilder;
2368    use arrow_array::builder::{
2369        GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2370    };
2371    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2372    use arrow_array::types::*;
2373    use arrow_buffer::ScalarBuffer;
2374
2375    use crate::MetadataVersion;
2376    use crate::convert::fb_to_schema;
2377    use crate::reader::*;
2378    use crate::root_as_footer;
2379
2380    use super::*;
2381
2382    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2383        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2384        writer.write(rb).unwrap();
2385        writer.finish().unwrap();
2386        writer.into_inner().unwrap()
2387    }
2388
2389    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2390        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2391        reader.next().unwrap().unwrap()
2392    }
2393
2394    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2395        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2396        // without needing to construct a giant array to spill over the 64-byte default alignment
2397        // boundary.
2398        const IPC_ALIGNMENT: usize = 8;
2399
2400        let mut stream_writer = StreamWriter::try_new_with_options(
2401            vec![],
2402            record.schema_ref(),
2403            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2404        )
2405        .unwrap();
2406        stream_writer.write(record).unwrap();
2407        stream_writer.finish().unwrap();
2408        stream_writer.into_inner().unwrap()
2409    }
2410
2411    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2412        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2413        stream_reader.next().unwrap().unwrap()
2414    }
2415
2416    #[test]
2417    #[cfg(feature = "lz4")]
2418    fn test_write_empty_record_batch_lz4_compression() {
2419        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2420        let values: Vec<Option<i32>> = vec![];
2421        let array = Int32Array::from(values);
2422        let record_batch =
2423            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2424
2425        let mut file = tempfile::tempfile().unwrap();
2426
2427        {
2428            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2429                .unwrap()
2430                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2431                .unwrap();
2432
2433            let mut writer =
2434                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2435            writer.write(&record_batch).unwrap();
2436            writer.finish().unwrap();
2437        }
2438        file.rewind().unwrap();
2439        {
2440            // read file
2441            let reader = FileReader::try_new(file, None).unwrap();
2442            for read_batch in reader {
2443                read_batch
2444                    .unwrap()
2445                    .columns()
2446                    .iter()
2447                    .zip(record_batch.columns())
2448                    .for_each(|(a, b)| {
2449                        assert_eq!(a.data_type(), b.data_type());
2450                        assert_eq!(a.len(), b.len());
2451                        assert_eq!(a.null_count(), b.null_count());
2452                    });
2453            }
2454        }
2455    }
2456
2457    #[test]
2458    #[cfg(feature = "lz4")]
2459    fn test_write_file_with_lz4_compression() {
2460        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2461        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2462        let array = Int32Array::from(values);
2463        let record_batch =
2464            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2465
2466        let mut file = tempfile::tempfile().unwrap();
2467        {
2468            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2469                .unwrap()
2470                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2471                .unwrap();
2472
2473            let mut writer =
2474                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2475            writer.write(&record_batch).unwrap();
2476            writer.finish().unwrap();
2477        }
2478        file.rewind().unwrap();
2479        {
2480            // read file
2481            let reader = FileReader::try_new(file, None).unwrap();
2482            for read_batch in reader {
2483                read_batch
2484                    .unwrap()
2485                    .columns()
2486                    .iter()
2487                    .zip(record_batch.columns())
2488                    .for_each(|(a, b)| {
2489                        assert_eq!(a.data_type(), b.data_type());
2490                        assert_eq!(a.len(), b.len());
2491                        assert_eq!(a.null_count(), b.null_count());
2492                    });
2493            }
2494        }
2495    }
2496
2497    #[test]
2498    #[cfg(feature = "zstd")]
2499    fn test_write_file_with_zstd_compression() {
2500        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2501        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2502        let array = Int32Array::from(values);
2503        let record_batch =
2504            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2505        let mut file = tempfile::tempfile().unwrap();
2506        {
2507            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2508                .unwrap()
2509                .try_with_compression(Some(crate::CompressionType::ZSTD))
2510                .unwrap();
2511
2512            let mut writer =
2513                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2514            writer.write(&record_batch).unwrap();
2515            writer.finish().unwrap();
2516        }
2517        file.rewind().unwrap();
2518        {
2519            // read file
2520            let reader = FileReader::try_new(file, None).unwrap();
2521            for read_batch in reader {
2522                read_batch
2523                    .unwrap()
2524                    .columns()
2525                    .iter()
2526                    .zip(record_batch.columns())
2527                    .for_each(|(a, b)| {
2528                        assert_eq!(a.data_type(), b.data_type());
2529                        assert_eq!(a.len(), b.len());
2530                        assert_eq!(a.null_count(), b.null_count());
2531                    });
2532            }
2533        }
2534    }
2535
2536    #[test]
2537    fn test_write_file() {
2538        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2539        let values: Vec<Option<u32>> = vec![
2540            Some(999),
2541            None,
2542            Some(235),
2543            Some(123),
2544            None,
2545            None,
2546            None,
2547            None,
2548            None,
2549        ];
2550        let array1 = UInt32Array::from(values);
2551        let batch =
2552            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2553                .unwrap();
2554        let mut file = tempfile::tempfile().unwrap();
2555        {
2556            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2557
2558            writer.write(&batch).unwrap();
2559            writer.finish().unwrap();
2560        }
2561        file.rewind().unwrap();
2562
2563        {
2564            let mut reader = FileReader::try_new(file, None).unwrap();
2565            while let Some(Ok(read_batch)) = reader.next() {
2566                read_batch
2567                    .columns()
2568                    .iter()
2569                    .zip(batch.columns())
2570                    .for_each(|(a, b)| {
2571                        assert_eq!(a.data_type(), b.data_type());
2572                        assert_eq!(a.len(), b.len());
2573                        assert_eq!(a.null_count(), b.null_count());
2574                    });
2575            }
2576        }
2577    }
2578
2579    #[test]
2580    fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2581        let name = StringArray::from(Vec::<String>::new());
2582        let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2583
2584        assert_eq!(name.len(), 0);
2585        assert_eq!(
2586            offsets.len(),
2587            std::mem::size_of::<i32>(),
2588            "offsets buffer should contain one zero i32 offset"
2589        );
2590        assert_eq!(values.len(), 0, "values buffer should remain empty");
2591    }
2592
2593    #[test]
2594    fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2595        let name = LargeStringArray::from(Vec::<String>::new());
2596        let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2597
2598        assert_eq!(name.len(), 0);
2599        assert_eq!(
2600            offsets.len(),
2601            std::mem::size_of::<i64>(),
2602            "offsets buffer should contain one zero i64 offset"
2603        );
2604        assert_eq!(values.len(), 0, "values buffer should remain empty");
2605    }
2606
2607    #[test]
2608    fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2609        let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2610        let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2611
2612        assert_eq!(list.len(), 0);
2613        assert_eq!(
2614            offsets.len(),
2615            std::mem::size_of::<i32>(),
2616            "offsets buffer should contain one zero i32 offset"
2617        );
2618        assert_eq!(child_data.len(), 0, "child data should remain empty");
2619    }
2620
2621    #[test]
2622    fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2623        let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2624        let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2625
2626        assert_eq!(list.len(), 0);
2627        assert_eq!(
2628            offsets.len(),
2629            std::mem::size_of::<i64>(),
2630            "offsets buffer should contain one zero i64 offset"
2631        );
2632        assert_eq!(child_data.len(), 0, "child data should remain empty");
2633    }
2634
2635    fn write_null_file(options: IpcWriteOptions) {
2636        let schema = Schema::new(vec![
2637            Field::new("nulls", DataType::Null, true),
2638            Field::new("int32s", DataType::Int32, false),
2639            Field::new("nulls2", DataType::Null, true),
2640            Field::new("f64s", DataType::Float64, false),
2641        ]);
2642        let array1 = NullArray::new(32);
2643        let array2 = Int32Array::from(vec![1; 32]);
2644        let array3 = NullArray::new(32);
2645        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2646        let batch = RecordBatch::try_new(
2647            Arc::new(schema.clone()),
2648            vec![
2649                Arc::new(array1) as ArrayRef,
2650                Arc::new(array2) as ArrayRef,
2651                Arc::new(array3) as ArrayRef,
2652                Arc::new(array4) as ArrayRef,
2653            ],
2654        )
2655        .unwrap();
2656        let mut file = tempfile::tempfile().unwrap();
2657        {
2658            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2659
2660            writer.write(&batch).unwrap();
2661            writer.finish().unwrap();
2662        }
2663
2664        file.rewind().unwrap();
2665
2666        {
2667            let reader = FileReader::try_new(file, None).unwrap();
2668            reader.for_each(|maybe_batch| {
2669                maybe_batch
2670                    .unwrap()
2671                    .columns()
2672                    .iter()
2673                    .zip(batch.columns())
2674                    .for_each(|(a, b)| {
2675                        assert_eq!(a.data_type(), b.data_type());
2676                        assert_eq!(a.len(), b.len());
2677                        assert_eq!(a.null_count(), b.null_count());
2678                    });
2679            });
2680        }
2681    }
2682    #[test]
2683    fn test_write_null_file_v4() {
2684        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2685        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2686        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2687        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2688    }
2689
2690    #[test]
2691    fn test_write_null_file_v5() {
2692        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2693        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2694    }
2695
2696    #[test]
2697    fn track_union_nested_dict() {
2698        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2699
2700        let array = Arc::new(inner) as ArrayRef;
2701
2702        // Dict field with id 2
2703        #[allow(deprecated)]
2704        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2705        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2706
2707        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2708        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2709
2710        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2711
2712        let schema = Arc::new(Schema::new(vec![Field::new(
2713            "union",
2714            union.data_type().clone(),
2715            false,
2716        )]));
2717
2718        let r#gen = IpcDataGenerator::default();
2719        let mut dict_tracker = DictionaryTracker::new(false);
2720        r#gen.schema_to_bytes_with_dictionary_tracker(
2721            &schema,
2722            &mut dict_tracker,
2723            &IpcWriteOptions::default(),
2724        );
2725
2726        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2727
2728        r#gen
2729            .encode(
2730                &batch,
2731                &mut dict_tracker,
2732                &Default::default(),
2733                &mut Default::default(),
2734            )
2735            .unwrap();
2736
2737        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2738        // so we expect the dict will be keyed to 0
2739        assert!(dict_tracker.written.contains_key(&0));
2740    }
2741
2742    #[test]
2743    fn track_struct_nested_dict() {
2744        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2745
2746        let array = Arc::new(inner) as ArrayRef;
2747
2748        // Dict field with id 2
2749        #[allow(deprecated)]
2750        let dctfield = Arc::new(Field::new_dict(
2751            "dict",
2752            array.data_type().clone(),
2753            false,
2754            2,
2755            false,
2756        ));
2757
2758        let s = StructArray::from(vec![(dctfield, array)]);
2759        let struct_array = Arc::new(s) as ArrayRef;
2760
2761        let schema = Arc::new(Schema::new(vec![Field::new(
2762            "struct",
2763            struct_array.data_type().clone(),
2764            false,
2765        )]));
2766
2767        let r#gen = IpcDataGenerator::default();
2768        let mut dict_tracker = DictionaryTracker::new(false);
2769        r#gen.schema_to_bytes_with_dictionary_tracker(
2770            &schema,
2771            &mut dict_tracker,
2772            &IpcWriteOptions::default(),
2773        );
2774
2775        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2776
2777        r#gen
2778            .encode(
2779                &batch,
2780                &mut dict_tracker,
2781                &Default::default(),
2782                &mut Default::default(),
2783            )
2784            .unwrap();
2785
2786        assert!(dict_tracker.written.contains_key(&0));
2787    }
2788
2789    fn write_union_file(options: IpcWriteOptions) {
2790        let schema = Schema::new(vec![Field::new_union(
2791            "union",
2792            vec![0, 1],
2793            vec![
2794                Field::new("a", DataType::Int32, false),
2795                Field::new("c", DataType::Float64, false),
2796            ],
2797            UnionMode::Sparse,
2798        )]);
2799        let mut builder = UnionBuilder::with_capacity_sparse(5);
2800        builder.append::<Int32Type>("a", 1).unwrap();
2801        builder.append_null::<Int32Type>("a").unwrap();
2802        builder.append::<Float64Type>("c", 3.0).unwrap();
2803        builder.append_null::<Float64Type>("c").unwrap();
2804        builder.append::<Int32Type>("a", 4).unwrap();
2805        let union = builder.build().unwrap();
2806
2807        let batch =
2808            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2809                .unwrap();
2810
2811        let mut file = tempfile::tempfile().unwrap();
2812        {
2813            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2814
2815            writer.write(&batch).unwrap();
2816            writer.finish().unwrap();
2817        }
2818        file.rewind().unwrap();
2819
2820        {
2821            let reader = FileReader::try_new(file, None).unwrap();
2822            reader.for_each(|maybe_batch| {
2823                maybe_batch
2824                    .unwrap()
2825                    .columns()
2826                    .iter()
2827                    .zip(batch.columns())
2828                    .for_each(|(a, b)| {
2829                        assert_eq!(a.data_type(), b.data_type());
2830                        assert_eq!(a.len(), b.len());
2831                        assert_eq!(a.null_count(), b.null_count());
2832                    });
2833            });
2834        }
2835    }
2836
2837    #[test]
2838    fn test_write_union_file_v4_v5() {
2839        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2840        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2841    }
2842
2843    #[test]
2844    fn test_write_view_types() {
2845        const LONG_TEST_STRING: &str =
2846            "This is a long string to make sure binary view array handles it";
2847        let schema = Schema::new(vec![
2848            Field::new("field1", DataType::BinaryView, true),
2849            Field::new("field2", DataType::Utf8View, true),
2850        ]);
2851        let values: Vec<Option<&[u8]>> = vec![
2852            Some(b"foo"),
2853            Some(b"bar"),
2854            Some(LONG_TEST_STRING.as_bytes()),
2855        ];
2856        let binary_array = BinaryViewArray::from_iter(values);
2857        let utf8_array =
2858            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2859        let record_batch = RecordBatch::try_new(
2860            Arc::new(schema.clone()),
2861            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2862        )
2863        .unwrap();
2864
2865        let mut file = tempfile::tempfile().unwrap();
2866        {
2867            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2868            writer.write(&record_batch).unwrap();
2869            writer.finish().unwrap();
2870        }
2871        file.rewind().unwrap();
2872        {
2873            let mut reader = FileReader::try_new(&file, None).unwrap();
2874            let read_batch = reader.next().unwrap().unwrap();
2875            read_batch
2876                .columns()
2877                .iter()
2878                .zip(record_batch.columns())
2879                .for_each(|(a, b)| {
2880                    assert_eq!(a, b);
2881                });
2882        }
2883        file.rewind().unwrap();
2884        {
2885            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2886            let read_batch = reader.next().unwrap().unwrap();
2887            assert_eq!(read_batch.num_columns(), 1);
2888            let read_array = read_batch.column(0);
2889            let write_array = record_batch.column(0);
2890            assert_eq!(read_array, write_array);
2891        }
2892    }
2893
2894    #[test]
2895    fn truncate_ipc_record_batch() {
2896        fn create_batch(rows: usize) -> RecordBatch {
2897            let schema = Schema::new(vec![
2898                Field::new("a", DataType::Int32, false),
2899                Field::new("b", DataType::Utf8, false),
2900            ]);
2901
2902            let a = Int32Array::from_iter_values(0..rows as i32);
2903            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2904
2905            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2906        }
2907
2908        let big_record_batch = create_batch(65536);
2909
2910        let length = 5;
2911        let small_record_batch = create_batch(length);
2912
2913        let offset = 2;
2914        let record_batch_slice = big_record_batch.slice(offset, length);
2915        assert!(
2916            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2917        );
2918        assert_eq!(
2919            serialize_stream(&small_record_batch).len(),
2920            serialize_stream(&record_batch_slice).len()
2921        );
2922
2923        assert_eq!(
2924            deserialize_stream(serialize_stream(&record_batch_slice)),
2925            record_batch_slice
2926        );
2927    }
2928
2929    #[test]
2930    fn truncate_ipc_record_batch_with_nulls() {
2931        fn create_batch() -> RecordBatch {
2932            let schema = Schema::new(vec![
2933                Field::new("a", DataType::Int32, true),
2934                Field::new("b", DataType::Utf8, true),
2935            ]);
2936
2937            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2938            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2939
2940            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2941        }
2942
2943        let record_batch = create_batch();
2944        let record_batch_slice = record_batch.slice(1, 2);
2945        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2946
2947        assert!(
2948            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2949        );
2950
2951        assert!(deserialized_batch.column(0).is_null(0));
2952        assert!(deserialized_batch.column(0).is_valid(1));
2953        assert!(deserialized_batch.column(1).is_valid(0));
2954        assert!(deserialized_batch.column(1).is_valid(1));
2955
2956        assert_eq!(record_batch_slice, deserialized_batch);
2957    }
2958
2959    #[test]
2960    fn truncate_ipc_dictionary_array() {
2961        fn create_batch() -> RecordBatch {
2962            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2963                .into_iter()
2964                .collect();
2965            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2966
2967            let array = DictionaryArray::new(keys, Arc::new(values));
2968
2969            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2970
2971            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2972        }
2973
2974        let record_batch = create_batch();
2975        let record_batch_slice = record_batch.slice(1, 2);
2976        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2977
2978        assert!(
2979            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2980        );
2981
2982        assert!(deserialized_batch.column(0).is_valid(0));
2983        assert!(deserialized_batch.column(0).is_null(1));
2984
2985        assert_eq!(record_batch_slice, deserialized_batch);
2986    }
2987
2988    #[test]
2989    fn truncate_ipc_struct_array() {
2990        fn create_batch() -> RecordBatch {
2991            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2992                .into_iter()
2993                .collect();
2994            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2995
2996            let struct_array = StructArray::from(vec![
2997                (
2998                    Arc::new(Field::new("s", DataType::Utf8, true)),
2999                    Arc::new(strings) as ArrayRef,
3000                ),
3001                (
3002                    Arc::new(Field::new("c", DataType::Int32, true)),
3003                    Arc::new(ints) as ArrayRef,
3004                ),
3005            ]);
3006
3007            let schema = Schema::new(vec![Field::new(
3008                "struct_array",
3009                struct_array.data_type().clone(),
3010                true,
3011            )]);
3012
3013            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3014        }
3015
3016        let record_batch = create_batch();
3017        let record_batch_slice = record_batch.slice(1, 2);
3018        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3019
3020        assert!(
3021            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3022        );
3023
3024        let structs = deserialized_batch
3025            .column(0)
3026            .as_any()
3027            .downcast_ref::<StructArray>()
3028            .unwrap();
3029
3030        assert!(structs.column(0).is_null(0));
3031        assert!(structs.column(0).is_valid(1));
3032        assert!(structs.column(1).is_valid(0));
3033        assert!(structs.column(1).is_null(1));
3034        assert_eq!(record_batch_slice, deserialized_batch);
3035    }
3036
3037    #[test]
3038    fn truncate_ipc_string_array_with_all_empty_string() {
3039        fn create_batch() -> RecordBatch {
3040            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3041            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3042            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3043        }
3044
3045        let record_batch = create_batch();
3046        let record_batch_slice = record_batch.slice(0, 1);
3047        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3048
3049        assert!(
3050            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3051        );
3052        assert_eq!(record_batch_slice, deserialized_batch);
3053    }
3054
3055    #[test]
3056    fn test_stream_writer_writes_array_slice() {
3057        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3058        assert_eq!(
3059            vec![Some(1), Some(2), Some(3)],
3060            array.iter().collect::<Vec<_>>()
3061        );
3062
3063        let sliced = array.slice(1, 2);
3064        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3065
3066        let batch = RecordBatch::try_new(
3067            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3068            vec![Arc::new(sliced)],
3069        )
3070        .expect("new batch");
3071
3072        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3073        writer.write(&batch).expect("write");
3074        let outbuf = writer.into_inner().expect("inner");
3075
3076        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3077        let read_batch = reader.next().unwrap().expect("read batch");
3078
3079        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3080        assert_eq!(
3081            vec![Some(2), Some(3)],
3082            read_array.iter().collect::<Vec<_>>()
3083        );
3084    }
3085
3086    #[test]
3087    fn test_large_slice_uint32() {
3088        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3089            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3090        )));
3091    }
3092
3093    #[test]
3094    fn test_large_slice_string() {
3095        let strings: Vec<_> = (0..8000)
3096            .map(|i| {
3097                if i % 2 == 0 {
3098                    Some(format!("value{i}"))
3099                } else {
3100                    None
3101                }
3102            })
3103            .collect();
3104
3105        ensure_roundtrip(Arc::new(StringArray::from(strings)));
3106    }
3107
3108    #[test]
3109    fn test_large_slice_string_list() {
3110        let mut ls = ListBuilder::new(StringBuilder::new());
3111
3112        let mut s = String::new();
3113        for row_number in 0..8000 {
3114            if row_number % 2 == 0 {
3115                for list_element in 0..1000 {
3116                    s.clear();
3117                    use std::fmt::Write;
3118                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3119                    ls.values().append_value(&s);
3120                }
3121                ls.append(true)
3122            } else {
3123                ls.append(false); // null
3124            }
3125        }
3126
3127        ensure_roundtrip(Arc::new(ls.finish()));
3128    }
3129
3130    #[test]
3131    fn test_large_slice_string_list_of_lists() {
3132        // The reason for the special test is to verify reencode_offsets which looks both at
3133        // the starting offset and the data offset.  So need a dataset where the starting_offset
3134        // is zero but the data offset is not.
3135        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3136
3137        for _ in 0..4000 {
3138            ls.values().append(true);
3139            ls.append(true)
3140        }
3141
3142        let mut s = String::new();
3143        for row_number in 0..4000 {
3144            if row_number % 2 == 0 {
3145                for list_element in 0..1000 {
3146                    s.clear();
3147                    use std::fmt::Write;
3148                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3149                    ls.values().values().append_value(&s);
3150                }
3151                ls.values().append(true);
3152                ls.append(true)
3153            } else {
3154                ls.append(false); // null
3155            }
3156        }
3157
3158        ensure_roundtrip(Arc::new(ls.finish()));
3159    }
3160
3161    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
3162    fn ensure_roundtrip(array: ArrayRef) {
3163        let num_rows = array.len();
3164        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3165        // take off the first element
3166        let sliced_batch = orig_batch.slice(1, num_rows - 1);
3167
3168        let schema = orig_batch.schema();
3169        let stream_data = {
3170            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3171            writer.write(&sliced_batch).unwrap();
3172            writer.into_inner().unwrap()
3173        };
3174        let read_batch = {
3175            let projection = None;
3176            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3177            reader
3178                .next()
3179                .expect("expect no errors reading batch")
3180                .expect("expect batch")
3181        };
3182        assert_eq!(sliced_batch, read_batch);
3183
3184        let file_data = {
3185            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3186            writer.write(&sliced_batch).unwrap();
3187            writer.into_inner().unwrap().into_inner().unwrap()
3188        };
3189        let read_batch = {
3190            let projection = None;
3191            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3192            reader
3193                .next()
3194                .expect("expect no errors reading batch")
3195                .expect("expect batch")
3196        };
3197        assert_eq!(sliced_batch, read_batch);
3198
3199        // TODO test file writer/reader
3200    }
3201
3202    #[test]
3203    fn encode_bools_slice() {
3204        // Test case for https://github.com/apache/arrow-rs/issues/3496
3205        assert_bool_roundtrip([true, false], 1, 1);
3206
3207        // slice somewhere in the middle
3208        assert_bool_roundtrip(
3209            [
3210                true, false, true, true, false, false, true, true, true, false, false, false, true,
3211                true, true, true, false, false, false, false, true, true, true, true, true, false,
3212                false, false, false, false,
3213            ],
3214            13,
3215            17,
3216        );
3217
3218        // start at byte boundary, end in the middle
3219        assert_bool_roundtrip(
3220            [
3221                true, false, true, true, false, false, true, true, true, false, false, false,
3222            ],
3223            8,
3224            2,
3225        );
3226
3227        // start and stop and byte boundary
3228        assert_bool_roundtrip(
3229            [
3230                true, false, true, true, false, false, true, true, true, false, false, false, true,
3231                true, true, true, true, false, false, false, false, false,
3232            ],
3233            8,
3234            8,
3235        );
3236    }
3237
3238    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3239        let val_bool_field = Field::new("val", DataType::Boolean, false);
3240
3241        let schema = Arc::new(Schema::new(vec![val_bool_field]));
3242
3243        let bools = BooleanArray::from(bools.to_vec());
3244
3245        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3246        let batch = batch.slice(offset, length);
3247
3248        let data = serialize_stream(&batch);
3249        let batch2 = deserialize_stream(data);
3250        assert_eq!(batch, batch2);
3251    }
3252
3253    #[test]
3254    fn test_run_array_unslice() {
3255        let total_len = 80;
3256        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3257        let repeats: Vec<usize> = vec![3, 4, 1, 2];
3258        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3259        for ix in 0_usize..32 {
3260            let repeat: usize = repeats[ix % repeats.len()];
3261            let val: Option<i32> = vals[ix % vals.len()];
3262            input_array.resize(input_array.len() + repeat, val);
3263        }
3264
3265        // Encode the input_array to run array
3266        let mut builder =
3267            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3268        builder.extend(input_array.iter().copied());
3269        let run_array = builder.finish();
3270
3271        // test for all slice lengths.
3272        for slice_len in 1..=total_len {
3273            // test for offset = 0, slice length = slice_len
3274            let sliced_run_array: RunArray<Int16Type> =
3275                run_array.slice(0, slice_len).into_data().into();
3276
3277            // Create unsliced run array.
3278            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3279            let typed = unsliced_run_array
3280                .downcast::<PrimitiveArray<Int32Type>>()
3281                .unwrap();
3282            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3283            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3284            assert_eq!(expected, actual);
3285
3286            // test for offset = total_len - slice_len, length = slice_len
3287            let sliced_run_array: RunArray<Int16Type> = run_array
3288                .slice(total_len - slice_len, slice_len)
3289                .into_data()
3290                .into();
3291
3292            // Create unsliced run array.
3293            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3294            let typed = unsliced_run_array
3295                .downcast::<PrimitiveArray<Int32Type>>()
3296                .unwrap();
3297            let expected: Vec<Option<i32>> = input_array
3298                .iter()
3299                .skip(total_len - slice_len)
3300                .copied()
3301                .collect();
3302            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3303            assert_eq!(expected, actual);
3304        }
3305    }
3306
3307    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3308        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3309
3310        for i in 0..100_000 {
3311            for value in [i, i, i] {
3312                ls.values().append_value(value);
3313            }
3314            ls.append(true)
3315        }
3316
3317        ls.finish()
3318    }
3319
3320    fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3321        let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3322
3323        for i in 0..100_000 {
3324            for value in [
3325                format!("value{}", i),
3326                format!("value{}", i),
3327                format!("value{}", i),
3328            ] {
3329                ls.values().append_value(&value);
3330            }
3331            ls.append(true)
3332        }
3333
3334        ls.finish()
3335    }
3336
3337    fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3338        let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3339
3340        for i in 0..100_000 {
3341            for value in [
3342                format!("value{}", i),
3343                format!("value{}", i),
3344                format!("value{}", i),
3345            ] {
3346                ls.values().append_value(&value);
3347            }
3348            ls.append(true)
3349        }
3350
3351        ls.finish()
3352    }
3353
3354    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3355        let mut ls =
3356            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3357
3358        for _i in 0..10_000 {
3359            for j in 0..10 {
3360                for value in [j, j, j, j] {
3361                    ls.values().values().append_value(value);
3362                }
3363                ls.values().append(true)
3364            }
3365            ls.append(true);
3366        }
3367
3368        ls.finish()
3369    }
3370
3371    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3372        let mut ls =
3373            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3374
3375        for _i in 0..999 {
3376            ls.values().append(true);
3377            ls.append(true);
3378        }
3379
3380        for j in 0..10 {
3381            for value in [j, j, j, j] {
3382                ls.values().values().append_value(value);
3383            }
3384            ls.values().append(true)
3385        }
3386        ls.append(true);
3387
3388        for i in 0..9_000 {
3389            for j in 0..10 {
3390                for value in [i + j, i + j, i + j, i + j] {
3391                    ls.values().values().append_value(value);
3392                }
3393                ls.values().append(true)
3394            }
3395            ls.append(true);
3396        }
3397
3398        ls.finish()
3399    }
3400
3401    fn generate_map_array_data() -> MapArray {
3402        let keys_builder = UInt32Builder::new();
3403        let values_builder = UInt32Builder::new();
3404
3405        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3406
3407        for i in 0..100_000 {
3408            for _j in 0..3 {
3409                builder.keys().append_value(i);
3410                builder.values().append_value(i * 2);
3411            }
3412            builder.append(true).unwrap();
3413        }
3414
3415        builder.finish()
3416    }
3417
3418    #[test]
3419    fn reencode_offsets_when_first_offset_is_not_zero() {
3420        let original_list = generate_list_data::<i32>();
3421        let original_data = original_list.into_data();
3422        let slice_data = original_data.slice(75, 7);
3423        let (new_offsets, original_start, length) =
3424            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3425        assert_eq!(
3426            vec![0, 3, 6, 9, 12, 15, 18, 21],
3427            new_offsets.typed_data::<i32>()
3428        );
3429        assert_eq!(225, original_start);
3430        assert_eq!(21, length);
3431    }
3432
3433    #[test]
3434    fn reencode_offsets_when_first_offset_is_zero() {
3435        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3436        // ls = [[], [35, 42]
3437        ls.append(true);
3438        ls.values().append_value(35);
3439        ls.values().append_value(42);
3440        ls.append(true);
3441        let original_list = ls.finish();
3442        let original_data = original_list.into_data();
3443
3444        let slice_data = original_data.slice(1, 1);
3445        let (new_offsets, original_start, length) =
3446            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3447        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3448        assert_eq!(0, original_start);
3449        assert_eq!(2, length);
3450    }
3451
3452    /// Ensure when serde full & sliced versions they are equal to original input.
3453    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3454    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3455        // test both full and sliced versions
3456        let in_sliced = in_batch.slice(999, 1);
3457
3458        let bytes_batch = serialize_file(&in_batch);
3459        let bytes_sliced = serialize_file(&in_sliced);
3460
3461        // serializing 1 row should be significantly smaller than serializing 100,000
3462        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3463
3464        // ensure both are still valid and equal to originals
3465        let out_batch = deserialize_file(bytes_batch);
3466        assert_eq!(in_batch, out_batch);
3467
3468        let out_sliced = deserialize_file(bytes_sliced);
3469        assert_eq!(in_sliced, out_sliced);
3470    }
3471
3472    #[test]
3473    fn encode_lists() {
3474        let val_inner = Field::new_list_field(DataType::UInt32, true);
3475        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3476        let schema = Arc::new(Schema::new(vec![val_list_field]));
3477
3478        let values = Arc::new(generate_list_data::<i32>());
3479
3480        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3481        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3482    }
3483
3484    #[test]
3485    fn encode_empty_list() {
3486        let val_inner = Field::new_list_field(DataType::UInt32, true);
3487        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3488        let schema = Arc::new(Schema::new(vec![val_list_field]));
3489
3490        let values = Arc::new(generate_list_data::<i32>());
3491
3492        let in_batch = RecordBatch::try_new(schema, vec![values])
3493            .unwrap()
3494            .slice(999, 0);
3495        let out_batch = deserialize_file(serialize_file(&in_batch));
3496        assert_eq!(in_batch, out_batch);
3497    }
3498
3499    #[test]
3500    fn encode_large_lists() {
3501        let val_inner = Field::new_list_field(DataType::UInt32, true);
3502        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3503        let schema = Arc::new(Schema::new(vec![val_list_field]));
3504
3505        let values = Arc::new(generate_list_data::<i64>());
3506
3507        // ensure when serde full & sliced versions they are equal to original input
3508        // also ensure serialized sliced version is significantly smaller than serialized full
3509        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3510        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3511    }
3512
3513    #[test]
3514    fn encode_large_lists_non_zero_offset() {
3515        let val_inner = Field::new_list_field(DataType::UInt32, true);
3516        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3517        let schema = Arc::new(Schema::new(vec![val_list_field]));
3518
3519        let values = Arc::new(generate_list_data::<i64>());
3520
3521        check_sliced_list_array(schema, values);
3522    }
3523
3524    #[test]
3525    fn encode_large_lists_string_non_zero_offset() {
3526        let val_inner = Field::new_list_field(DataType::Utf8, true);
3527        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3528        let schema = Arc::new(Schema::new(vec![val_list_field]));
3529
3530        let values = Arc::new(generate_string_list_data::<i64>());
3531
3532        check_sliced_list_array(schema, values);
3533    }
3534
3535    #[test]
3536    fn encode_large_list_string_view_non_zero_offset() {
3537        let val_inner = Field::new_list_field(DataType::Utf8View, true);
3538        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3539        let schema = Arc::new(Schema::new(vec![val_list_field]));
3540
3541        let values = Arc::new(generate_utf8view_list_data::<i64>());
3542
3543        check_sliced_list_array(schema, values);
3544    }
3545
3546    fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3547        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3548            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3549                .unwrap()
3550                .slice(offset, len);
3551            let out_batch = deserialize_file(serialize_file(&in_batch));
3552            assert_eq!(in_batch, out_batch);
3553        }
3554    }
3555
3556    #[test]
3557    fn encode_nested_lists() {
3558        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3559        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3560        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3561        let schema = Arc::new(Schema::new(vec![list_field]));
3562
3563        let values = Arc::new(generate_nested_list_data::<i32>());
3564
3565        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3566        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3567    }
3568
3569    #[test]
3570    fn encode_nested_lists_starting_at_zero() {
3571        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3572        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3573        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3574        let schema = Arc::new(Schema::new(vec![list_field]));
3575
3576        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3577
3578        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3579        roundtrip_ensure_sliced_smaller(in_batch, 1);
3580    }
3581
3582    #[test]
3583    fn encode_map_array() {
3584        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3585        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3586        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3587        let schema = Arc::new(Schema::new(vec![map_field]));
3588
3589        let values = Arc::new(generate_map_array_data());
3590
3591        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3592        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3593    }
3594
3595    fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3596        let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3597
3598        for i in 0u32..100_000 {
3599            if i.is_multiple_of(10_000) {
3600                builder.append(false);
3601                continue;
3602            }
3603            for value in [i, i, i] {
3604                builder.values().append_value(value);
3605            }
3606            builder.append(true);
3607        }
3608
3609        builder.finish()
3610    }
3611
3612    #[test]
3613    fn encode_list_view_arrays() {
3614        let val_inner = Field::new_list_field(DataType::UInt32, true);
3615        let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3616        let schema = Arc::new(Schema::new(vec![val_field]));
3617
3618        let values = Arc::new(generate_list_view_data::<i32>());
3619
3620        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3621        let out_batch = deserialize_file(serialize_file(&in_batch));
3622        assert_eq!(in_batch, out_batch);
3623    }
3624
3625    #[test]
3626    fn encode_large_list_view_arrays() {
3627        let val_inner = Field::new_list_field(DataType::UInt32, true);
3628        let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3629        let schema = Arc::new(Schema::new(vec![val_field]));
3630
3631        let values = Arc::new(generate_list_view_data::<i64>());
3632
3633        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3634        let out_batch = deserialize_file(serialize_file(&in_batch));
3635        assert_eq!(in_batch, out_batch);
3636    }
3637
3638    #[test]
3639    fn check_sliced_list_view_array() {
3640        let inner = Field::new_list_field(DataType::UInt32, true);
3641        let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3642        let schema = Arc::new(Schema::new(vec![field]));
3643        let values = Arc::new(generate_list_view_data::<i32>());
3644
3645        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3646            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3647                .unwrap()
3648                .slice(offset, len);
3649            let out_batch = deserialize_file(serialize_file(&in_batch));
3650            assert_eq!(in_batch, out_batch);
3651        }
3652    }
3653
3654    #[test]
3655    fn check_sliced_large_list_view_array() {
3656        let inner = Field::new_list_field(DataType::UInt32, true);
3657        let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3658        let schema = Arc::new(Schema::new(vec![field]));
3659        let values = Arc::new(generate_list_view_data::<i64>());
3660
3661        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3662            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3663                .unwrap()
3664                .slice(offset, len);
3665            let out_batch = deserialize_file(serialize_file(&in_batch));
3666            assert_eq!(in_batch, out_batch);
3667        }
3668    }
3669
3670    fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3671        let inner_builder = UInt32Builder::new();
3672        let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3673        let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3674
3675        for i in 0u32..10_000 {
3676            if i.is_multiple_of(1_000) {
3677                outer_builder.append(false);
3678                continue;
3679            }
3680
3681            for _ in 0..3 {
3682                for value in [i, i + 1, i + 2] {
3683                    outer_builder.values().values().append_value(value);
3684                }
3685                outer_builder.values().append(true);
3686            }
3687            outer_builder.append(true);
3688        }
3689
3690        outer_builder.finish()
3691    }
3692
3693    #[test]
3694    fn encode_nested_list_views() {
3695        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3696        let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3697        let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3698        let schema = Arc::new(Schema::new(vec![list_field]));
3699
3700        let values = Arc::new(generate_nested_list_view_data::<i32>());
3701
3702        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3703        let out_batch = deserialize_file(serialize_file(&in_batch));
3704        assert_eq!(in_batch, out_batch);
3705    }
3706
3707    fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3708        list_data_type: DataType,
3709        offsets: &[U; 5],
3710        sizes: &[U; 4],
3711    ) {
3712        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3713        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3714        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3715        let dict_data = dict_array.to_data();
3716
3717        let value_offsets = Buffer::from_slice_ref(offsets);
3718        let value_sizes = Buffer::from_slice_ref(sizes);
3719
3720        let list_data = ArrayData::builder(list_data_type)
3721            .len(4)
3722            .add_buffer(value_offsets)
3723            .add_buffer(value_sizes)
3724            .add_child_data(dict_data)
3725            .build()
3726            .unwrap();
3727        let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3728
3729        let schema = Arc::new(Schema::new(vec![Field::new(
3730            "f1",
3731            list_view_array.data_type().clone(),
3732            false,
3733        )]));
3734        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3735
3736        let output_batch = deserialize_file(serialize_file(&input_batch));
3737        assert_eq!(input_batch, output_batch);
3738
3739        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3740        assert_eq!(input_batch, output_batch);
3741    }
3742
3743    #[test]
3744    fn test_roundtrip_list_view_of_dict() {
3745        #[allow(deprecated)]
3746        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3747            "item",
3748            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3749            true,
3750            1,
3751            false,
3752        )));
3753        let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3754        let sizes: &[i32; 4] = &[2, 2, 0, 3];
3755        test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3756    }
3757
3758    #[test]
3759    fn test_roundtrip_large_list_view_of_dict() {
3760        #[allow(deprecated)]
3761        let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3762            "item",
3763            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3764            true,
3765            2,
3766            false,
3767        )));
3768        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3769        let sizes: &[i64; 4] = &[2, 2, 0, 3];
3770        test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3771    }
3772
3773    #[test]
3774    fn test_roundtrip_sliced_list_view_of_dict() {
3775        #[allow(deprecated)]
3776        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3777            "item",
3778            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3779            true,
3780            3,
3781            false,
3782        )));
3783
3784        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3785        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3786        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3787        let dict_data = dict_array.to_data();
3788
3789        let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3790        let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3791        let value_offsets = Buffer::from_slice_ref(offsets);
3792        let value_sizes = Buffer::from_slice_ref(sizes);
3793
3794        let list_data = ArrayData::builder(list_data_type)
3795            .len(6)
3796            .add_buffer(value_offsets)
3797            .add_buffer(value_sizes)
3798            .add_child_data(dict_data)
3799            .build()
3800            .unwrap();
3801        let list_view_array = GenericListViewArray::<i32>::from(list_data);
3802
3803        let schema = Arc::new(Schema::new(vec![Field::new(
3804            "f1",
3805            list_view_array.data_type().clone(),
3806            false,
3807        )]));
3808        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3809
3810        let sliced_batch = input_batch.slice(1, 4);
3811
3812        let output_batch = deserialize_file(serialize_file(&sliced_batch));
3813        assert_eq!(sliced_batch, output_batch);
3814
3815        let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3816        assert_eq!(sliced_batch, output_batch);
3817    }
3818
3819    #[test]
3820    fn test_roundtrip_dense_union_of_dict() {
3821        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3822        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3823        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3824
3825        #[allow(deprecated)]
3826        let dict_field = Arc::new(Field::new_dict(
3827            "dict",
3828            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3829            true,
3830            1,
3831            false,
3832        ));
3833        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3834        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3835
3836        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3837        let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3838
3839        let int_array = Int32Array::from(vec![100, 200]);
3840
3841        let union = UnionArray::try_new(
3842            union_fields.clone(),
3843            types,
3844            Some(offsets),
3845            vec![Arc::new(dict_array), Arc::new(int_array)],
3846        )
3847        .unwrap();
3848
3849        let schema = Arc::new(Schema::new(vec![Field::new(
3850            "union",
3851            DataType::Union(union_fields, UnionMode::Dense),
3852            false,
3853        )]));
3854        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3855
3856        let output_batch = deserialize_file(serialize_file(&input_batch));
3857        assert_eq!(input_batch, output_batch);
3858
3859        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3860        assert_eq!(input_batch, output_batch);
3861    }
3862
3863    #[test]
3864    fn test_roundtrip_sparse_union_of_dict() {
3865        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3866        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3867        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3868
3869        #[allow(deprecated)]
3870        let dict_field = Arc::new(Field::new_dict(
3871            "dict",
3872            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3873            true,
3874            2,
3875            false,
3876        ));
3877        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3878        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3879
3880        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3881
3882        let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3883
3884        let union = UnionArray::try_new(
3885            union_fields.clone(),
3886            types,
3887            None,
3888            vec![Arc::new(dict_array), Arc::new(int_array)],
3889        )
3890        .unwrap();
3891
3892        let schema = Arc::new(Schema::new(vec![Field::new(
3893            "union",
3894            DataType::Union(union_fields, UnionMode::Sparse),
3895            false,
3896        )]));
3897        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3898
3899        let output_batch = deserialize_file(serialize_file(&input_batch));
3900        assert_eq!(input_batch, output_batch);
3901
3902        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3903        assert_eq!(input_batch, output_batch);
3904    }
3905
3906    #[test]
3907    fn test_roundtrip_map_with_dict_keys() {
3908        // Building a map array is a bit involved. We first build a struct arary that has a key and
3909        // value field and then use that to build the actual map array.
3910        let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3911        let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3912        let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3913
3914        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3915
3916        #[allow(deprecated)]
3917        let entries_field = Arc::new(Field::new(
3918            "entries",
3919            DataType::Struct(
3920                vec![
3921                    Field::new_dict(
3922                        "key",
3923                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3924                        false,
3925                        1,
3926                        false,
3927                    ),
3928                    Field::new("value", DataType::Int32, true),
3929                ]
3930                .into(),
3931            ),
3932            false,
3933        ));
3934
3935        let entries = StructArray::from(vec![
3936            (
3937                Arc::new(Field::new(
3938                    "key",
3939                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3940                    false,
3941                )),
3942                Arc::new(dict_keys) as ArrayRef,
3943            ),
3944            (
3945                Arc::new(Field::new("value", DataType::Int32, true)),
3946                Arc::new(values) as ArrayRef,
3947            ),
3948        ]);
3949
3950        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3951
3952        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3953            .len(3)
3954            .add_buffer(offsets)
3955            .add_child_data(entries.into_data())
3956            .build()
3957            .unwrap();
3958        let map_array = MapArray::from(map_data);
3959
3960        let schema = Arc::new(Schema::new(vec![Field::new(
3961            "map",
3962            map_array.data_type().clone(),
3963            false,
3964        )]));
3965        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3966
3967        let output_batch = deserialize_file(serialize_file(&input_batch));
3968        assert_eq!(input_batch, output_batch);
3969
3970        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3971        assert_eq!(input_batch, output_batch);
3972    }
3973
3974    #[test]
3975    fn test_roundtrip_map_with_dict_values() {
3976        // Building a map array is a bit involved. We first build a struct arary that has a key and
3977        // value field and then use that to build the actual map array.
3978        let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3979
3980        let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3981        let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3982        let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3983
3984        #[allow(deprecated)]
3985        let entries_field = Arc::new(Field::new(
3986            "entries",
3987            DataType::Struct(
3988                vec![
3989                    Field::new("key", DataType::Utf8, false),
3990                    Field::new_dict(
3991                        "value",
3992                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3993                        true,
3994                        2,
3995                        false,
3996                    ),
3997                ]
3998                .into(),
3999            ),
4000            false,
4001        ));
4002
4003        let entries = StructArray::from(vec![
4004            (
4005                Arc::new(Field::new("key", DataType::Utf8, false)),
4006                Arc::new(keys) as ArrayRef,
4007            ),
4008            (
4009                Arc::new(Field::new(
4010                    "value",
4011                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4012                    true,
4013                )),
4014                Arc::new(dict_values) as ArrayRef,
4015            ),
4016        ]);
4017
4018        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4019
4020        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4021            .len(3)
4022            .add_buffer(offsets)
4023            .add_child_data(entries.into_data())
4024            .build()
4025            .unwrap();
4026        let map_array = MapArray::from(map_data);
4027
4028        let schema = Arc::new(Schema::new(vec![Field::new(
4029            "map",
4030            map_array.data_type().clone(),
4031            false,
4032        )]));
4033        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4034
4035        let output_batch = deserialize_file(serialize_file(&input_batch));
4036        assert_eq!(input_batch, output_batch);
4037
4038        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4039        assert_eq!(input_batch, output_batch);
4040    }
4041
4042    #[test]
4043    fn test_decimal128_alignment16_is_sufficient() {
4044        const IPC_ALIGNMENT: usize = 16;
4045
4046        // Test a bunch of different dimensions to ensure alignment is never an issue.
4047        // For example, if we only test `num_cols = 1` then even with alignment 8 this
4048        // test would _happen_ to pass, even though for different dimensions like
4049        // `num_cols = 2` it would fail.
4050        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4051            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
4052
4053            let mut fields = Vec::new();
4054            let mut arrays = Vec::new();
4055            for i in 0..num_cols {
4056                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4057                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4058                fields.push(field);
4059                arrays.push(Arc::new(array) as Arc<dyn Array>);
4060            }
4061            let schema = Schema::new(fields);
4062            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4063
4064            let mut writer = FileWriter::try_new_with_options(
4065                Vec::new(),
4066                batch.schema_ref(),
4067                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4068            )
4069            .unwrap();
4070            writer.write(&batch).unwrap();
4071            writer.finish().unwrap();
4072
4073            let out: Vec<u8> = writer.into_inner().unwrap();
4074
4075            let buffer = Buffer::from_vec(out);
4076            let trailer_start = buffer.len() - 10;
4077            let footer_len =
4078                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4079            let footer =
4080                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4081
4082            let schema = fb_to_schema(footer.schema().unwrap());
4083
4084            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
4085            // for `read_record_batch` later on to read the data in a zero-copy manner.
4086            let decoder =
4087                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4088
4089            let batches = footer.recordBatches().unwrap();
4090
4091            let block = batches.get(0);
4092            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4093            let data = buffer.slice_with_length(block.offset() as _, block_len);
4094
4095            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4096
4097            assert_eq!(batch, batch2);
4098        }
4099    }
4100
4101    #[test]
4102    fn test_decimal128_alignment8_is_unaligned() {
4103        const IPC_ALIGNMENT: usize = 8;
4104
4105        let num_cols = 2;
4106        let num_rows = 1;
4107
4108        let mut fields = Vec::new();
4109        let mut arrays = Vec::new();
4110        for i in 0..num_cols {
4111            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4112            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4113            fields.push(field);
4114            arrays.push(Arc::new(array) as Arc<dyn Array>);
4115        }
4116        let schema = Schema::new(fields);
4117        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4118
4119        let mut writer = FileWriter::try_new_with_options(
4120            Vec::new(),
4121            batch.schema_ref(),
4122            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4123        )
4124        .unwrap();
4125        writer.write(&batch).unwrap();
4126        writer.finish().unwrap();
4127
4128        let out: Vec<u8> = writer.into_inner().unwrap();
4129
4130        let buffer = Buffer::from_vec(out);
4131        let trailer_start = buffer.len() - 10;
4132        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4133        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4134        let schema = fb_to_schema(footer.schema().unwrap());
4135
4136        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
4137        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
4138        let decoder =
4139            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4140
4141        let batches = footer.recordBatches().unwrap();
4142
4143        let block = batches.get(0);
4144        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4145        let data = buffer.slice_with_length(block.offset() as _, block_len);
4146
4147        let result = decoder.read_record_batch(block, &data);
4148
4149        let error = result.unwrap_err();
4150        assert_eq!(
4151            error.to_string(),
4152            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4153             offset from expected alignment of 16 by 8"
4154        );
4155    }
4156
4157    #[test]
4158    fn test_flush() {
4159        // We write a schema which is small enough to fit into a buffer and not get flushed,
4160        // and then force the write with .flush().
4161        let num_cols = 2;
4162        let mut fields = Vec::new();
4163        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4164        for i in 0..num_cols {
4165            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4166            fields.push(field);
4167        }
4168        let schema = Schema::new(fields);
4169        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4170        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4171        let mut stream_writer =
4172            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4173                .unwrap();
4174        let mut file_writer =
4175            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4176
4177        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4178        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4179        stream_writer.flush().unwrap();
4180        file_writer.flush().unwrap();
4181        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4182        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4183        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4184        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
4185        // and then a length of 0 (4 bytes) for a total of 8 bytes.
4186        // Everything before that should have been flushed in the .flush() call.
4187        let expected_stream_flushed_bytes = stream_out.len() - 8;
4188        // A file write is the same as the stream write except for the leading magic string
4189        // ARROW1 plus padding, which is 8 bytes.
4190        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4191
4192        assert!(
4193            stream_bytes_written_on_new < stream_bytes_written_on_flush,
4194            "this test makes no sense if flush is not actually required"
4195        );
4196        assert!(
4197            file_bytes_written_on_new < file_bytes_written_on_flush,
4198            "this test makes no sense if flush is not actually required"
4199        );
4200        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4201        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4202    }
4203
4204    #[test]
4205    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4206        let l1_type =
4207            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4208        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4209
4210        let l0_builder = Float32Builder::new();
4211        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4212            "item",
4213            DataType::Float32,
4214            false,
4215        )));
4216        let mut l2_builder =
4217            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4218
4219        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4220            l2_builder.values().values().append_value(point[0]);
4221            l2_builder.values().values().append_value(point[1]);
4222            l2_builder.values().values().append_value(point[2]);
4223
4224            l2_builder.values().append(true);
4225        }
4226        l2_builder.append(true);
4227
4228        let point = [10., 11., 12.];
4229        l2_builder.values().values().append_value(point[0]);
4230        l2_builder.values().values().append_value(point[1]);
4231        l2_builder.values().values().append_value(point[2]);
4232
4233        l2_builder.values().append(true);
4234        l2_builder.append(true);
4235
4236        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4237
4238        let schema = Arc::new(Schema::new_with_metadata(
4239            vec![Field::new("points", l2_type, false)],
4240            HashMap::default(),
4241        ));
4242
4243        // Test a variety of combinations that include 0 and non-zero offsets
4244        // and also portions or the rest of the array
4245        test_slices(&array, &schema, 0, 1)?;
4246        test_slices(&array, &schema, 0, 2)?;
4247        test_slices(&array, &schema, 1, 1)?;
4248
4249        Ok(())
4250    }
4251
4252    #[test]
4253    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4254        let l0_builder = Float32Builder::new();
4255        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4256        let mut l2_builder = ListBuilder::new(l1_builder);
4257
4258        for point in [
4259            [Some(1.0), Some(2.0), None],
4260            [Some(4.0), Some(5.0), Some(6.0)],
4261            [None, Some(8.0), Some(9.0)],
4262        ] {
4263            for p in point {
4264                match p {
4265                    Some(p) => l2_builder.values().values().append_value(p),
4266                    None => l2_builder.values().values().append_null(),
4267                }
4268            }
4269
4270            l2_builder.values().append(true);
4271        }
4272        l2_builder.append(true);
4273
4274        let point = [Some(10.), None, None];
4275        for p in point {
4276            match p {
4277                Some(p) => l2_builder.values().values().append_value(p),
4278                None => l2_builder.values().values().append_null(),
4279            }
4280        }
4281
4282        l2_builder.values().append(true);
4283        l2_builder.append(true);
4284
4285        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4286
4287        let schema = Arc::new(Schema::new_with_metadata(
4288            vec![Field::new(
4289                "points",
4290                DataType::List(Arc::new(Field::new(
4291                    "item",
4292                    DataType::FixedSizeList(
4293                        Arc::new(Field::new("item", DataType::Float32, true)),
4294                        3,
4295                    ),
4296                    true,
4297                ))),
4298                true,
4299            )],
4300            HashMap::default(),
4301        ));
4302
4303        // Test a variety of combinations that include 0 and non-zero offsets
4304        // and also portions or the rest of the array
4305        test_slices(&array, &schema, 0, 1)?;
4306        test_slices(&array, &schema, 0, 2)?;
4307        test_slices(&array, &schema, 1, 1)?;
4308
4309        Ok(())
4310    }
4311
4312    fn test_slices(
4313        parent_array: &ArrayRef,
4314        schema: &SchemaRef,
4315        offset: usize,
4316        length: usize,
4317    ) -> Result<(), ArrowError> {
4318        let subarray = parent_array.slice(offset, length);
4319        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4320
4321        let mut bytes = Vec::new();
4322        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4323        writer.write(&original_batch)?;
4324        writer.finish()?;
4325
4326        let mut cursor = std::io::Cursor::new(bytes);
4327        let mut reader = StreamReader::try_new(&mut cursor, None)?;
4328        let returned_batch = reader.next().unwrap()?;
4329
4330        assert_eq!(original_batch, returned_batch);
4331
4332        Ok(())
4333    }
4334
4335    #[test]
4336    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4337        let int_builder = Int64Builder::new();
4338        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4339            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4340
4341        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4342            fixed_list_builder.values().append_value(point[0]);
4343            fixed_list_builder.values().append_value(point[1]);
4344            fixed_list_builder.values().append_value(point[2]);
4345
4346            fixed_list_builder.append(true);
4347        }
4348
4349        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4350
4351        let schema = Arc::new(Schema::new_with_metadata(
4352            vec![Field::new(
4353                "points",
4354                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4355                false,
4356            )],
4357            HashMap::default(),
4358        ));
4359
4360        // Test a variety of combinations that include 0 and non-zero offsets
4361        // and also portions or the rest of the array
4362        test_slices(&array, &schema, 0, 4)?;
4363        test_slices(&array, &schema, 0, 2)?;
4364        test_slices(&array, &schema, 1, 3)?;
4365        test_slices(&array, &schema, 2, 1)?;
4366
4367        Ok(())
4368    }
4369
4370    #[test]
4371    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4372        let int_builder = Int64Builder::new();
4373        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4374
4375        for point in [
4376            [Some(1), Some(2), None],
4377            [Some(4), Some(5), Some(6)],
4378            [None, Some(8), Some(9)],
4379            [Some(10), None, None],
4380        ] {
4381            for p in point {
4382                match p {
4383                    Some(p) => fixed_list_builder.values().append_value(p),
4384                    None => fixed_list_builder.values().append_null(),
4385                }
4386            }
4387
4388            fixed_list_builder.append(true);
4389        }
4390
4391        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4392
4393        let schema = Arc::new(Schema::new_with_metadata(
4394            vec![Field::new(
4395                "points",
4396                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4397                true,
4398            )],
4399            HashMap::default(),
4400        ));
4401
4402        // Test a variety of combinations that include 0 and non-zero offsets
4403        // and also portions or the rest of the array
4404        test_slices(&array, &schema, 0, 4)?;
4405        test_slices(&array, &schema, 0, 2)?;
4406        test_slices(&array, &schema, 1, 3)?;
4407        test_slices(&array, &schema, 2, 1)?;
4408
4409        Ok(())
4410    }
4411
4412    #[test]
4413    fn test_metadata_encoding_ordering() {
4414        fn create_hash() -> u64 {
4415            let metadata: HashMap<String, String> = [
4416                ("a", "1"), //
4417                ("b", "2"), //
4418                ("c", "3"), //
4419                ("d", "4"), //
4420                ("e", "5"), //
4421            ]
4422            .into_iter()
4423            .map(|(k, v)| (k.to_owned(), v.to_owned()))
4424            .collect();
4425
4426            // Set metadata on both the schema and a field within it.
4427            let schema = Arc::new(
4428                Schema::new(vec![
4429                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4430                ])
4431                .with_metadata(metadata)
4432                .clone(),
4433            );
4434            let batch = RecordBatch::new_empty(schema.clone());
4435
4436            let mut bytes = Vec::new();
4437            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4438            w.write(&batch).unwrap();
4439            w.finish().unwrap();
4440
4441            let mut h = std::hash::DefaultHasher::new();
4442            h.write(&bytes);
4443            h.finish()
4444        }
4445
4446        let expected = create_hash();
4447
4448        // Since there is randomness in the HashMap and we cannot specify our
4449        // own Hasher for the implementation used for metadata, run the above
4450        // code 20x and verify it does not change. This is not perfect but it
4451        // should be good enough.
4452        let all_passed = (0..20).all(|_| create_hash() == expected);
4453        assert!(all_passed);
4454    }
4455
4456    #[test]
4457    fn test_dictionary_tracker_reset() {
4458        let data_gen = IpcDataGenerator::default();
4459        let mut dictionary_tracker = DictionaryTracker::new(false);
4460        let writer_options = IpcWriteOptions::default();
4461        let mut compression_ctx = CompressionContext::default();
4462
4463        let schema = Arc::new(Schema::new(vec![Field::new(
4464            "a",
4465            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4466            false,
4467        )]));
4468
4469        let mut write_single_batch_stream =
4470            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4471                let mut buffer = Vec::new();
4472
4473                // create a new IPC stream:
4474                let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4475                    &schema,
4476                    dict_tracker,
4477                    &writer_options,
4478                );
4479                _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4480
4481                let (encoded_dicts, encoded_batch) = data_gen
4482                    .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4483                    .unwrap();
4484                for encoded_dict in encoded_dicts {
4485                    _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4486                }
4487                _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4488
4489                buffer
4490            };
4491
4492        let batch1 = RecordBatch::try_new(
4493            schema.clone(),
4494            vec![Arc::new(DictionaryArray::new(
4495                UInt8Array::from_iter_values([0]),
4496                Arc::new(StringArray::from_iter_values(["a"])),
4497            ))],
4498        )
4499        .unwrap();
4500        let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4501
4502        // ensure we can read the stream back
4503        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4504        let read_batch = reader.next().unwrap().unwrap();
4505        assert_eq!(read_batch, batch1);
4506
4507        // reset the dictionary tracker so it can be used for next stream
4508        dictionary_tracker.clear();
4509
4510        // now write a 2nd stream and ensure we can also read it:
4511        let batch2 = RecordBatch::try_new(
4512            schema.clone(),
4513            vec![Arc::new(DictionaryArray::new(
4514                UInt8Array::from_iter_values([0]),
4515                Arc::new(StringArray::from_iter_values(["a"])),
4516            ))],
4517        )
4518        .unwrap();
4519        let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4520        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4521        let read_batch = reader.next().unwrap().unwrap();
4522        assert_eq!(read_batch, batch2);
4523    }
4524}