Skip to main content

arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
50#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52    /// Write padding after memory buffers to this multiple of bytes.
53    /// Must be 8, 16, 32, or 64 - defaults to 64.
54    alignment: u8,
55    /// The legacy format is for releases before 0.15.0, and uses metadata V4
56    write_legacy_ipc_format: bool,
57    /// The metadata version to write. The Rust IPC writer supports V4+
58    ///
59    /// *Default versions per crate*
60    ///
61    /// When creating the default IpcWriteOptions, the following metadata versions are used:
62    ///
63    /// version 2.0.0: V4, with legacy format enabled
64    /// version 4.0.0: V5
65    metadata_version: crate::MetadataVersion,
66    /// Compression, if desired. Will result in a runtime error
67    /// if the corresponding feature is not enabled
68    batch_compression_type: Option<crate::CompressionType>,
69    // Compression level
70    batch_compression_level: Option<i32>,
71    /// How to handle updating dictionaries in IPC messages
72    dictionary_handling: DictionaryHandling,
73}
74
75/// A single buffer segment ready to be written to the output stream.
76///
77/// For the uncompressed path the original Arc-backed [`Buffer`] is stored
78/// directly (zero copy). For the compressed path the compressed bytes are
79/// owned by a scratch `Vec<u8>`.
80enum EncodedBuffer {
81    /// Uncompressed: Arc-backed reference to the original array buffer.
82    Raw(Buffer),
83    /// Compressed: owned scratch bytes produced by the codec.
84    Compressed(Vec<u8>),
85}
86
87impl EncodedBuffer {
88    fn as_slice(&self) -> &[u8] {
89        match self {
90            EncodedBuffer::Raw(b) => b.as_slice(),
91            EncodedBuffer::Compressed(v) => v.as_slice(),
92        }
93    }
94
95    fn len(&self) -> usize {
96        match self {
97            EncodedBuffer::Raw(b) => b.len(),
98            EncodedBuffer::Compressed(v) => v.len(),
99        }
100    }
101}
102/// Accumulates the IPC metadata produced by [`write_array_data`].
103///
104/// `nodes` and `buffers` are serialised into the flatbuffer `RecordBatch` (or `DictionaryBatch`)
105/// header. The companion [`IpcBodySink`] holds the actual encoded bytes.
106#[derive(Default)]
107struct IpcMetadataBuilder {
108    nodes: Vec<crate::FieldNode>,
109    buffers: Vec<crate::Buffer>,
110}
111
112/// Destination for the raw Arrow data bytes (the IPC message body) produced by [`write_array_data`].
113///
114/// The companion [`IpcMetadataBuilder`] accumulates the flatbuffer metadata
115/// (offset + length of each buffer in the body); together they form a complete IPC message.
116enum IpcBodySink<'a> {
117    /// Serialize buffer bytes (with padding) into a contiguous byte vec.
118    Write(&'a mut Vec<u8>),
119    /// Accumulate pre-encoded buffer segments for deferred zero-copy streaming.
120    Collect(&'a mut Vec<EncodedBuffer>),
121}
122impl<'a> IpcBodySink<'a> {
123    /// Writes the encoded buffer to the sink.
124    pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
125        match self {
126            IpcBodySink::Write(vec) => {
127                vec.extend_from_slice(buffer.as_slice());
128                vec.extend_from_slice(&PADDING[..pad_len]);
129            }
130            IpcBodySink::Collect(vec) => {
131                vec.push(buffer);
132            }
133        }
134    }
135}
136
137/// Per-message sizes produced by [`IpcDataGenerator::write`].
138///
139/// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for
140/// random-access reads.
141struct IpcWriteMetadata {
142    /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written
143    /// before the record batch.
144    dictionary_block_sizes: Vec<(usize, usize)>,
145    /// Flatbuffer header size including continuation prefix and alignment padding.
146    padded_header_len: usize,
147    /// Total length of the record-batch body including trailing alignment padding.
148    body_len: usize,
149}
150
151impl IpcWriteOptions {
152    /// Configures compression when writing IPC files.
153    ///
154    /// Will result in a runtime error if the corresponding feature
155    /// is not enabled
156    pub fn try_with_compression(
157        mut self,
158        batch_compression_type: Option<crate::CompressionType>,
159    ) -> Result<Self, ArrowError> {
160        self.batch_compression_type = batch_compression_type;
161
162        if self.batch_compression_type.is_some()
163            && self.metadata_version < crate::MetadataVersion::V5
164        {
165            return Err(ArrowError::InvalidArgumentError(
166                "Compression only supported in metadata v5 and above".to_string(),
167            ));
168        }
169        Ok(self)
170    }
171
172    /// Configures the compression level used when writing compressed IPC batches.
173    ///
174    /// Compression levels require metadata V5 or newer and are currently only
175    /// supported for ZSTD compression.
176    pub fn try_with_compression_level(
177        mut self,
178        batch_compression_level: Option<i32>,
179    ) -> Result<Self, ArrowError> {
180        self.batch_compression_level = batch_compression_level;
181
182        if self.batch_compression_level.is_some()
183            && self.metadata_version < crate::MetadataVersion::V5
184        {
185            return Err(ArrowError::InvalidArgumentError(
186                "Compression only supported in metadata v5 and above".to_string(),
187            ));
188        }
189
190        match (self.batch_compression_type, self.batch_compression_level) {
191            (Some(crate::CompressionType::ZSTD), Some(level)) => {
192                return self.check_zstd_level(level);
193            }
194            (Some(crate::CompressionType::LZ4_FRAME), Some(_)) => {
195                return Err(ArrowError::InvalidArgumentError(
196                    "LZ4 Frame compression does not support configurable compression levels"
197                        .to_string(),
198                ));
199            }
200            _ => {}
201        }
202
203        Ok(self)
204    }
205
206    #[cfg(not(feature = "zstd"))]
207    fn check_zstd_level(self, _level: i32) -> Result<Self, ArrowError> {
208        Err(ArrowError::InvalidArgumentError(
209            "zstd IPC compression requires the zstd feature".to_string(),
210        ))
211    }
212
213    #[cfg(feature = "zstd")]
214    fn check_zstd_level(self, level: i32) -> Result<Self, ArrowError> {
215        let range = zstd::compression_level_range();
216        if !range.contains(&(level as zstd::zstd_safe::CompressionLevel)) {
217            return Err(ArrowError::InvalidArgumentError(format!(
218                "ZSTD compression level must be between {} and {}, got {}",
219                range.start(),
220                range.end(),
221                level,
222            )));
223        }
224
225        Ok(self)
226    }
227
228    /// Try to create IpcWriteOptions, checking for incompatible settings
229    pub fn try_new(
230        alignment: usize,
231        write_legacy_ipc_format: bool,
232        metadata_version: crate::MetadataVersion,
233    ) -> Result<Self, ArrowError> {
234        let is_alignment_valid =
235            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
236        if !is_alignment_valid {
237            return Err(ArrowError::InvalidArgumentError(
238                "Alignment should be 8, 16, 32, or 64.".to_string(),
239            ));
240        }
241        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
242        match metadata_version {
243            crate::MetadataVersion::V1
244            | crate::MetadataVersion::V2
245            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
246                "Writing IPC metadata version 3 and lower not supported".to_string(),
247            )),
248            #[allow(deprecated)]
249            crate::MetadataVersion::V4 => Ok(Self {
250                alignment,
251                write_legacy_ipc_format,
252                metadata_version,
253                batch_compression_type: None,
254                batch_compression_level: None,
255                dictionary_handling: DictionaryHandling::default(),
256            }),
257            crate::MetadataVersion::V5 => {
258                if write_legacy_ipc_format {
259                    Err(ArrowError::InvalidArgumentError(
260                        "Legacy IPC format only supported on metadata version 4".to_string(),
261                    ))
262                } else {
263                    Ok(Self {
264                        alignment,
265                        write_legacy_ipc_format,
266                        metadata_version,
267                        batch_compression_type: None,
268                        batch_compression_level: None,
269                        dictionary_handling: DictionaryHandling::default(),
270                    })
271                }
272            }
273            z => Err(ArrowError::InvalidArgumentError(format!(
274                "Unsupported crate::MetadataVersion {z:?}"
275            ))),
276        }
277    }
278
279    /// Configure how dictionaries are handled in IPC messages
280    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
281        self.dictionary_handling = dictionary_handling;
282        self
283    }
284}
285
286impl Default for IpcWriteOptions {
287    fn default() -> Self {
288        Self {
289            alignment: 64,
290            write_legacy_ipc_format: false,
291            metadata_version: crate::MetadataVersion::V5,
292            batch_compression_type: None,
293            batch_compression_level: None,
294            dictionary_handling: DictionaryHandling::default(),
295        }
296    }
297}
298
299#[derive(Debug, Default)]
300/// Handles low level details of encoding [`Array`] and [`Schema`] into the
301/// [Arrow IPC Format].
302///
303/// # Example
304/// ```
305/// # fn run() {
306/// # use std::sync::Arc;
307/// # use arrow_array::UInt64Array;
308/// # use arrow_array::RecordBatch;
309/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
310///
311/// // Create a record batch
312/// let batch = RecordBatch::try_from_iter(vec![
313///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
314/// ]).unwrap();
315///
316/// // Error of dictionary ids are replaced.
317/// let error_on_replacement = true;
318/// let options = IpcWriteOptions::default();
319/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
320///
321/// let mut compression_context = CompressionContext::default();
322///
323/// // encode the batch into zero or more encoded dictionaries
324/// // and the data for the actual array.
325/// let data_gen = IpcDataGenerator::default();
326/// let (encoded_dictionaries, encoded_message) = data_gen
327///   .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context)
328///   .unwrap();
329/// # }
330/// ```
331///
332/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
333pub struct IpcDataGenerator {}
334
335impl IpcDataGenerator {
336    /// Converts a schema to an IPC message along with `dictionary_tracker`
337    /// and returns it encoded inside [EncodedData] as a flatbuffer.
338    pub fn schema_to_bytes_with_dictionary_tracker(
339        &self,
340        schema: &Schema,
341        dictionary_tracker: &mut DictionaryTracker,
342        write_options: &IpcWriteOptions,
343    ) -> EncodedData {
344        let mut fbb = FlatBufferBuilder::new();
345        let schema = {
346            let fb = IpcSchemaEncoder::new()
347                .with_dictionary_tracker(dictionary_tracker)
348                .schema_to_fb_offset(&mut fbb, schema);
349            fb.as_union_value()
350        };
351
352        let mut message = crate::MessageBuilder::new(&mut fbb);
353        message.add_version(write_options.metadata_version);
354        message.add_header_type(crate::MessageHeader::Schema);
355        message.add_bodyLength(0);
356        message.add_header(schema);
357        // TODO: custom metadata
358        let data = message.finish();
359        fbb.finish(data, None);
360
361        let data = fbb.finished_data();
362        EncodedData {
363            ipc_message: data.to_vec(),
364            arrow_data: vec![],
365        }
366    }
367
368    fn _encode_dictionaries<I: Iterator<Item = i64>>(
369        &self,
370        column: &ArrayRef,
371        encoded_dictionaries: &mut Vec<EncodedData>,
372        dictionary_tracker: &mut DictionaryTracker,
373        write_options: &IpcWriteOptions,
374        dict_id: &mut I,
375        compression_context: &mut CompressionContext,
376    ) -> Result<(), ArrowError> {
377        match column.data_type() {
378            DataType::Struct(fields) => {
379                let s = as_struct_array(column);
380                for (field, column) in fields.iter().zip(s.columns()) {
381                    self.encode_dictionaries(
382                        field,
383                        column,
384                        encoded_dictionaries,
385                        dictionary_tracker,
386                        write_options,
387                        dict_id,
388                        compression_context,
389                    )?;
390                }
391            }
392            DataType::RunEndEncoded(_, values) => {
393                let data = column.to_data();
394                if data.child_data().len() != 2 {
395                    return Err(ArrowError::InvalidArgumentError(format!(
396                        "The run encoded array should have exactly two child arrays. Found {}",
397                        data.child_data().len()
398                    )));
399                }
400                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
401                // only for values array.
402                let values_array = make_array(data.child_data()[1].clone());
403                self.encode_dictionaries(
404                    values,
405                    &values_array,
406                    encoded_dictionaries,
407                    dictionary_tracker,
408                    write_options,
409                    dict_id,
410                    compression_context,
411                )?;
412            }
413            DataType::List(field) => {
414                let list = as_list_array(column);
415                self.encode_dictionaries(
416                    field,
417                    list.values(),
418                    encoded_dictionaries,
419                    dictionary_tracker,
420                    write_options,
421                    dict_id,
422                    compression_context,
423                )?;
424            }
425            DataType::LargeList(field) => {
426                let list = as_large_list_array(column);
427                self.encode_dictionaries(
428                    field,
429                    list.values(),
430                    encoded_dictionaries,
431                    dictionary_tracker,
432                    write_options,
433                    dict_id,
434                    compression_context,
435                )?;
436            }
437            DataType::ListView(field) => {
438                let list = column.as_list_view::<i32>();
439                self.encode_dictionaries(
440                    field,
441                    list.values(),
442                    encoded_dictionaries,
443                    dictionary_tracker,
444                    write_options,
445                    dict_id,
446                    compression_context,
447                )?;
448            }
449            DataType::LargeListView(field) => {
450                let list = column.as_list_view::<i64>();
451                self.encode_dictionaries(
452                    field,
453                    list.values(),
454                    encoded_dictionaries,
455                    dictionary_tracker,
456                    write_options,
457                    dict_id,
458                    compression_context,
459                )?;
460            }
461            DataType::FixedSizeList(field, _) => {
462                let list = column
463                    .as_any()
464                    .downcast_ref::<FixedSizeListArray>()
465                    .expect("Unable to downcast to fixed size list array");
466                self.encode_dictionaries(
467                    field,
468                    list.values(),
469                    encoded_dictionaries,
470                    dictionary_tracker,
471                    write_options,
472                    dict_id,
473                    compression_context,
474                )?;
475            }
476            DataType::Map(field, _) => {
477                let map_array = as_map_array(column);
478
479                let (keys, values) = match field.data_type() {
480                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
481                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
482                };
483
484                // keys
485                self.encode_dictionaries(
486                    keys,
487                    map_array.keys(),
488                    encoded_dictionaries,
489                    dictionary_tracker,
490                    write_options,
491                    dict_id,
492                    compression_context,
493                )?;
494
495                // values
496                self.encode_dictionaries(
497                    values,
498                    map_array.values(),
499                    encoded_dictionaries,
500                    dictionary_tracker,
501                    write_options,
502                    dict_id,
503                    compression_context,
504                )?;
505            }
506            DataType::Union(fields, _) => {
507                let union = as_union_array(column);
508                for (type_id, field) in fields.iter() {
509                    let column = union.child(type_id);
510                    self.encode_dictionaries(
511                        field,
512                        column,
513                        encoded_dictionaries,
514                        dictionary_tracker,
515                        write_options,
516                        dict_id,
517                        compression_context,
518                    )?;
519                }
520            }
521            _ => (),
522        }
523
524        Ok(())
525    }
526
527    #[allow(clippy::too_many_arguments)]
528    fn encode_dictionaries<I: Iterator<Item = i64>>(
529        &self,
530        field: &Field,
531        column: &ArrayRef,
532        encoded_dictionaries: &mut Vec<EncodedData>,
533        dictionary_tracker: &mut DictionaryTracker,
534        write_options: &IpcWriteOptions,
535        dict_id_seq: &mut I,
536        compression_context: &mut CompressionContext,
537    ) -> Result<(), ArrowError> {
538        match column.data_type() {
539            DataType::Dictionary(_key_type, _value_type) => {
540                let dict_data = column.to_data();
541                let dict_values = &dict_data.child_data()[0];
542
543                let values = make_array(dict_data.child_data()[0].clone());
544
545                self._encode_dictionaries(
546                    &values,
547                    encoded_dictionaries,
548                    dictionary_tracker,
549                    write_options,
550                    dict_id_seq,
551                    compression_context,
552                )?;
553
554                // It's important to only take the dict_id at this point, because the dict ID
555                // sequence is assigned depth-first, so we need to first encode children and have
556                // them take their assigned dict IDs before we take the dict ID for this field.
557                let dict_id = dict_id_seq.next().ok_or_else(|| {
558                    ArrowError::IpcError(format!(
559                        "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
560                        field.name(),
561                        field.data_type(),
562                        column.data_type()
563                    ))
564                })?;
565
566                match dictionary_tracker.insert_column(
567                    dict_id,
568                    column,
569                    write_options.dictionary_handling,
570                )? {
571                    DictionaryUpdate::None => {}
572                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
573                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
574                            dict_id,
575                            dict_values,
576                            write_options,
577                            false,
578                            compression_context,
579                        )?);
580                    }
581                    DictionaryUpdate::Delta(data) => {
582                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
583                            dict_id,
584                            &data,
585                            write_options,
586                            true,
587                            compression_context,
588                        )?);
589                    }
590                }
591            }
592            _ => self._encode_dictionaries(
593                column,
594                encoded_dictionaries,
595                dictionary_tracker,
596                write_options,
597                dict_id_seq,
598                compression_context,
599            )?,
600        }
601
602        Ok(())
603    }
604
605    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
606    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
607    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
608    pub fn encode(
609        &self,
610        batch: &RecordBatch,
611        dictionary_tracker: &mut DictionaryTracker,
612        write_options: &IpcWriteOptions,
613        compression_context: &mut CompressionContext,
614    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
615        let encoded_dictionaries = self.encode_all_dicts(
616            batch,
617            dictionary_tracker,
618            write_options,
619            compression_context,
620        )?;
621        let mut arrow_data = Vec::new();
622        let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
623            batch,
624            write_options,
625            compression_context,
626            &mut IpcBodySink::Write(&mut arrow_data),
627        )?;
628        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
629        Ok((
630            encoded_dictionaries,
631            EncodedData {
632                ipc_message,
633                arrow_data,
634            },
635        ))
636    }
637
638    /// Encode dictionary batches for all columns in `batch`.
639    fn encode_all_dicts(
640        &self,
641        batch: &RecordBatch,
642        dictionary_tracker: &mut DictionaryTracker,
643        write_options: &IpcWriteOptions,
644        compression_context: &mut CompressionContext,
645    ) -> Result<Vec<EncodedData>, ArrowError> {
646        let schema = batch.schema();
647        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
648        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
649        for (i, field) in schema.fields().iter().enumerate() {
650            self.encode_dictionaries(
651                field,
652                batch.column(i),
653                &mut encoded_dictionaries,
654                dictionary_tracker,
655                write_options,
656                &mut dict_id,
657                compression_context,
658            )?;
659        }
660        Ok(encoded_dictionaries)
661    }
662
663    /// Write dictionary batches and the record batch directly to `writer`, skipping the
664    /// intermediate body `Vec<u8>` allocations
665    /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer blocks.
666    fn write<W: Write>(
667        &self,
668        batch: &RecordBatch,
669        dictionary_tracker: &mut DictionaryTracker,
670        write_options: &IpcWriteOptions,
671        compression_context: &mut CompressionContext,
672        writer: &mut W,
673    ) -> Result<IpcWriteMetadata, ArrowError> {
674        let encoded_dictionaries = self.encode_all_dicts(
675            batch,
676            dictionary_tracker,
677            write_options,
678            compression_context,
679        )?;
680
681        let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
682        for dict in encoded_dictionaries {
683            dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
684        }
685
686        let capacity = batch
687            .columns()
688            .iter()
689            .map(|a| estimate_encoded_buffer_count(a.data_type()))
690            .sum();
691        let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
692        let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
693            batch,
694            write_options,
695            compression_context,
696            &mut IpcBodySink::Collect(&mut encoded_buffers),
697        )?;
698
699        let alignment = write_options.alignment;
700        let a = usize::from(alignment - 1);
701        let prefix_size = if write_options.write_legacy_ipc_format {
702            4
703        } else {
704            8
705        };
706        let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
707        write_continuation(
708            &mut *writer,
709            write_options,
710            (aligned_size - prefix_size) as i32,
711        )?;
712        writer.write_all(&ipc_message)?;
713        writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
714        for enc in &encoded_buffers {
715            writer.write_all(enc.as_slice())?;
716            writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
717        }
718        writer.write_all(&PADDING[..tail_pad])?;
719
720        Ok(IpcWriteMetadata {
721            dictionary_block_sizes,
722            padded_header_len: aligned_size,
723            body_len,
724        })
725    }
726
727    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
728    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
729    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
730    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
731    pub fn encoded_batch(
732        &self,
733        batch: &RecordBatch,
734        dictionary_tracker: &mut DictionaryTracker,
735        write_options: &IpcWriteOptions,
736    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
737        self.encode(
738            batch,
739            dictionary_tracker,
740            write_options,
741            &mut Default::default(),
742        )
743    }
744
745    /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the
746    /// serialised buffer data.
747    ///
748    /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header bytes, the
749    /// total body length including trailing padding, and the trailing alignment padding byte count.
750    fn record_batch_to_bytes(
751        &self,
752        batch: &RecordBatch,
753        write_options: &IpcWriteOptions,
754        compression_context: &mut CompressionContext,
755        sink: &mut IpcBodySink<'_>,
756    ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
757        let mut fbb = FlatBufferBuilder::new();
758
759        let batch_compression_type = write_options.batch_compression_type;
760
761        let compression = batch_compression_type.map(|batch_compression_type| {
762            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
763            c.add_method(crate::BodyCompressionMethod::BUFFER);
764            c.add_codec(batch_compression_type);
765            c.finish()
766        });
767
768        let batch_compression_level = write_options.batch_compression_level;
769        let compression_codec: Option<CompressionCodec> = batch_compression_type
770            .map(|compression_type| match batch_compression_level {
771                Some(level) => {
772                    CompressionCodec::try_new_with_compression_level(compression_type, level)
773                }
774                None => compression_type.try_into(),
775            })
776            .transpose()?;
777
778        let alignment = write_options.alignment;
779        let mut variadic_buffer_counts = vec![];
780        let mut meta = IpcMetadataBuilder::default();
781        let mut offset = 0i64;
782
783        for array in batch.columns() {
784            let array_data = array.to_data();
785            offset = write_array_data(
786                &array_data,
787                &mut meta,
788                sink,
789                offset,
790                compression_codec,
791                compression_context,
792                write_options,
793            )?;
794            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
795        }
796
797        let tail_pad = pad_to_alignment(alignment, offset as usize);
798        let body_len = offset as usize + tail_pad;
799
800        let buffers = fbb.create_vector(&meta.buffers);
801        let nodes = fbb.create_vector(&meta.nodes);
802        let variadic_buffer = if variadic_buffer_counts.is_empty() {
803            None
804        } else {
805            Some(fbb.create_vector(&variadic_buffer_counts))
806        };
807
808        let root = {
809            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
810            batch_builder.add_length(batch.num_rows() as i64);
811            batch_builder.add_nodes(nodes);
812            batch_builder.add_buffers(buffers);
813            if let Some(c) = compression {
814                batch_builder.add_compression(c);
815            }
816            if let Some(v) = variadic_buffer {
817                batch_builder.add_variadicBufferCounts(v);
818            }
819            batch_builder.finish().as_union_value()
820        };
821        // create an crate::Message
822        let mut message = crate::MessageBuilder::new(&mut fbb);
823        message.add_version(write_options.metadata_version);
824        message.add_header_type(crate::MessageHeader::RecordBatch);
825        message.add_bodyLength(body_len as i64);
826        message.add_header(root);
827        let root = message.finish();
828        fbb.finish(root, None);
829
830        Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
831    }
832
833    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
834    /// other for the data
835    fn dictionary_batch_to_bytes(
836        &self,
837        dict_id: i64,
838        array_data: &ArrayData,
839        write_options: &IpcWriteOptions,
840        is_delta: bool,
841        compression_context: &mut CompressionContext,
842    ) -> Result<EncodedData, ArrowError> {
843        let mut fbb = FlatBufferBuilder::new();
844
845        let mut arrow_data: Vec<u8> = vec![];
846
847        // get the type of compression
848        let batch_compression_type = write_options.batch_compression_type;
849
850        let compression = batch_compression_type.map(|batch_compression_type| {
851            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
852            c.add_method(crate::BodyCompressionMethod::BUFFER);
853            c.add_codec(batch_compression_type);
854            c.finish()
855        });
856
857        let batch_compression_level = write_options.batch_compression_level;
858        let compression_codec: Option<CompressionCodec> = batch_compression_type
859            .map(|batch_compression_type| match batch_compression_level {
860                Some(level) => {
861                    CompressionCodec::try_new_with_compression_level(batch_compression_type, level)
862                }
863                None => batch_compression_type.try_into(),
864            })
865            .transpose()?;
866
867        let alignment = write_options.alignment;
868        let mut meta = IpcMetadataBuilder::default();
869        let mut sink = IpcBodySink::Write(&mut arrow_data);
870        let offset = write_array_data(
871            array_data,
872            &mut meta,
873            &mut sink,
874            0,
875            compression_codec,
876            compression_context,
877            write_options,
878        )?;
879
880        let mut variadic_buffer_counts = vec![];
881        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
882
883        // pad the tail of body data
884        let tail_pad = pad_to_alignment(alignment, offset as usize);
885        let body_len = offset as usize + tail_pad;
886        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
887
888        // write data
889        let buffers = fbb.create_vector(&meta.buffers);
890        let nodes = fbb.create_vector(&meta.nodes);
891        let variadic_buffer = if variadic_buffer_counts.is_empty() {
892            None
893        } else {
894            Some(fbb.create_vector(&variadic_buffer_counts))
895        };
896
897        let root = {
898            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
899            batch_builder.add_length(array_data.len() as i64);
900            batch_builder.add_nodes(nodes);
901            batch_builder.add_buffers(buffers);
902            if let Some(c) = compression {
903                batch_builder.add_compression(c);
904            }
905            if let Some(v) = variadic_buffer {
906                batch_builder.add_variadicBufferCounts(v);
907            }
908            batch_builder.finish()
909        };
910
911        let root = {
912            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
913            batch_builder.add_id(dict_id);
914            batch_builder.add_data(root);
915            batch_builder.add_isDelta(is_delta);
916            batch_builder.finish().as_union_value()
917        };
918
919        let root = {
920            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
921            message_builder.add_version(write_options.metadata_version);
922            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
923            message_builder.add_bodyLength(body_len as i64);
924            message_builder.add_header(root);
925            message_builder.finish()
926        };
927
928        fbb.finish(root, None);
929        let finished_data = fbb.finished_data();
930
931        Ok(EncodedData {
932            ipc_message: finished_data.to_vec(),
933            arrow_data,
934        })
935    }
936}
937
938fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
939    match array.data_type() {
940        DataType::BinaryView | DataType::Utf8View => {
941            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
942            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
943            counts.push(array.buffers().len() as i64 - 1);
944        }
945        DataType::Dictionary(_, _) => {
946            // Do nothing
947            // Dictionary types are handled in `encode_dictionaries`.
948        }
949        _ => {
950            for child in array.child_data() {
951                append_variadic_buffer_counts(counts, child)
952            }
953        }
954    }
955}
956
957pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
958    match arr.data_type() {
959        DataType::RunEndEncoded(k, _) => match k.data_type() {
960            DataType::Int16 => {
961                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
962            }
963            DataType::Int32 => {
964                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
965            }
966            DataType::Int64 => {
967                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
968            }
969            d => unreachable!("Unexpected data type {d}"),
970        },
971        d => Err(ArrowError::InvalidArgumentError(format!(
972            "The given array is not a run array. Data type of given array: {d}"
973        ))),
974    }
975}
976
977// Returns a `RunArray` with zero offset and length matching the last value
978// in run_ends array.
979fn into_zero_offset_run_array<R: RunEndIndexType>(
980    run_array: RunArray<R>,
981) -> Result<RunArray<R>, ArrowError> {
982    let run_ends = run_array.run_ends();
983    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
984        return Ok(run_array);
985    }
986
987    // The physical index of original run_ends array from which the `ArrayData`is sliced.
988    let start_physical_index = run_ends.get_start_physical_index();
989
990    // The physical index of original run_ends array until which the `ArrayData`is sliced.
991    let end_physical_index = run_ends.get_end_physical_index();
992
993    let physical_length = end_physical_index - start_physical_index + 1;
994
995    // build new run_ends array by subtracting offset from run ends.
996    let offset = R::Native::usize_as(run_ends.offset());
997    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
998    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
999        builder.append(run_end_value.sub_wrapping(offset));
1000    }
1001    builder.append(R::Native::from_usize(run_array.len()).unwrap());
1002    let new_run_ends = unsafe {
1003        // Safety:
1004        // The function builds a valid run_ends array and hence need not be validated.
1005        ArrayDataBuilder::new(R::DATA_TYPE)
1006            .len(physical_length)
1007            .add_buffer(builder.finish())
1008            .build_unchecked()
1009    };
1010
1011    // build new values by slicing physical indices.
1012    let new_values = run_array
1013        .values()
1014        .slice(start_physical_index, physical_length)
1015        .into_data();
1016
1017    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
1018        .len(run_array.len())
1019        .add_child_data(new_run_ends)
1020        .add_child_data(new_values);
1021    let array_data = unsafe {
1022        // Safety:
1023        //  This function builds a valid run array and hence can skip validation.
1024        builder.build_unchecked()
1025    };
1026    Ok(array_data.into())
1027}
1028
1029/// Controls how dictionaries are handled in Arrow IPC messages
1030#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1031pub enum DictionaryHandling {
1032    /// Send the entire dictionary every time it is encountered (default)
1033    #[default]
1034    Resend,
1035    /// Send only new dictionary values since the last batch (delta encoding)
1036    ///
1037    /// When a dictionary is first encountered, the entire dictionary is sent.
1038    /// For subsequent batches, only values that are new (not previously sent)
1039    /// are transmitted with the `isDelta` flag set to true.
1040    Delta,
1041}
1042
1043/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
1044#[derive(Debug, Clone)]
1045pub enum DictionaryUpdate {
1046    /// No dictionary was written, the dictionary was identical to what was already
1047    /// in the tracker.
1048    None,
1049    /// No dictionary was present in the tracker
1050    New,
1051    /// Dictionary was replaced with the new data
1052    Replaced,
1053    /// Dictionary was updated, ArrayData is the delta between old and new
1054    Delta(ArrayData),
1055}
1056
1057/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
1058/// multiple times.
1059///
1060/// Can optionally error if an update to an existing dictionary is attempted, which
1061/// isn't allowed in the `FileWriter`.
1062#[derive(Debug)]
1063pub struct DictionaryTracker {
1064    // NOTE: When adding fields, update the clear() method accordingly.
1065    written: HashMap<i64, ArrayData>,
1066    dict_ids: Vec<i64>,
1067    error_on_replacement: bool,
1068}
1069
1070impl DictionaryTracker {
1071    /// Create a new [`DictionaryTracker`].
1072    ///
1073    /// If `error_on_replacement`
1074    /// is true, an error will be generated if an update to an
1075    /// existing dictionary is attempted.
1076    pub fn new(error_on_replacement: bool) -> Self {
1077        #[allow(deprecated)]
1078        Self {
1079            written: HashMap::new(),
1080            dict_ids: Vec::new(),
1081            error_on_replacement,
1082        }
1083    }
1084
1085    /// Record and return the next dictionary ID.
1086    pub fn next_dict_id(&mut self) -> i64 {
1087        let next = self
1088            .dict_ids
1089            .last()
1090            .copied()
1091            .map(|i| i + 1)
1092            .unwrap_or_default();
1093
1094        self.dict_ids.push(next);
1095        next
1096    }
1097
1098    /// Return the sequence of dictionary IDs in the order they should be observed while
1099    /// traversing the schema
1100    pub fn dict_id(&mut self) -> &[i64] {
1101        &self.dict_ids
1102    }
1103
1104    /// Keep track of the dictionary with the given ID and values. Behavior:
1105    ///
1106    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
1107    ///   that the dictionary was not actually inserted (because it's already been seen).
1108    /// * If this ID has been written already but with different data, and this tracker is
1109    ///   configured to return an error, return an error.
1110    /// * If the tracker has not been configured to error on replacement or this dictionary
1111    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
1112    ///   inserted.
1113    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1114    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1115        let dict_data = column.to_data();
1116        let dict_values = &dict_data.child_data()[0];
1117
1118        // If a dictionary with this id was already emitted, check if it was the same.
1119        if let Some(last) = self.written.get(&dict_id) {
1120            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1121                // Same dictionary values => no need to emit it again
1122                return Ok(false);
1123            }
1124            if self.error_on_replacement {
1125                // If error on replacement perform a logical comparison
1126                if last.child_data()[0] == *dict_values {
1127                    // Same dictionary values => no need to emit it again
1128                    return Ok(false);
1129                }
1130                return Err(ArrowError::InvalidArgumentError(
1131                    "Dictionary replacement detected when writing IPC file format. \
1132                     Arrow IPC files only support a single dictionary for a given field \
1133                     across all batches."
1134                        .to_string(),
1135                ));
1136            }
1137        }
1138
1139        self.written.insert(dict_id, dict_data);
1140        Ok(true)
1141    }
1142
1143    /// Keep track of the dictionary with the given ID and values. The return
1144    /// value indicates what, if any, update to the internal map took place
1145    /// and how it should be interpreted based on the `dict_handling` parameter.
1146    ///
1147    /// # Returns
1148    ///
1149    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
1150    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
1151    ///   with completely different data, or if the data is a delta of the existing,
1152    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
1153    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
1154    ///   the new data is a delta of the old and the `dict_handling` is set to
1155    ///   `DictionaryHandling::Delta`
1156    /// * `Err(e)` - If the dictionary was previously written with different data,
1157    ///   and `error_on_replacement` is set to `true`.
1158    pub fn insert_column(
1159        &mut self,
1160        dict_id: i64,
1161        column: &ArrayRef,
1162        dict_handling: DictionaryHandling,
1163    ) -> Result<DictionaryUpdate, ArrowError> {
1164        let new_data = column.to_data();
1165        let new_values = &new_data.child_data()[0];
1166
1167        // If there is no existing dictionary with this ID, we always insert
1168        let Some(old) = self.written.get(&dict_id) else {
1169            self.written.insert(dict_id, new_data);
1170            return Ok(DictionaryUpdate::New);
1171        };
1172
1173        // Fast path - If the array data points to the same buffer as the
1174        // existing then they're the same.
1175        let old_values = &old.child_data()[0];
1176        if ArrayData::ptr_eq(old_values, new_values) {
1177            return Ok(DictionaryUpdate::None);
1178        }
1179
1180        // Slow path - Compare the dictionaries value by value
1181        let comparison = compare_dictionaries(old_values, new_values);
1182        if matches!(comparison, DictionaryComparison::Equal) {
1183            return Ok(DictionaryUpdate::None);
1184        }
1185
1186        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1187                 Arrow IPC files only support a single dictionary for a given field \
1188                 across all batches.";
1189
1190        match comparison {
1191            DictionaryComparison::NotEqual => {
1192                if self.error_on_replacement {
1193                    return Err(ArrowError::InvalidArgumentError(
1194                        REPLACEMENT_ERROR.to_string(),
1195                    ));
1196                }
1197
1198                self.written.insert(dict_id, new_data);
1199                Ok(DictionaryUpdate::Replaced)
1200            }
1201            DictionaryComparison::Delta => match dict_handling {
1202                DictionaryHandling::Resend => {
1203                    if self.error_on_replacement {
1204                        return Err(ArrowError::InvalidArgumentError(
1205                            REPLACEMENT_ERROR.to_string(),
1206                        ));
1207                    }
1208
1209                    self.written.insert(dict_id, new_data);
1210                    Ok(DictionaryUpdate::Replaced)
1211                }
1212                DictionaryHandling::Delta => {
1213                    let delta =
1214                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
1215                    self.written.insert(dict_id, new_data);
1216                    Ok(DictionaryUpdate::Delta(delta))
1217                }
1218            },
1219            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1220        }
1221    }
1222
1223    /// Clears the state of the dictionary tracker.
1224    ///
1225    /// This allows the dictionary tracker to be reused for a new IPC stream while avoiding the
1226    /// allocation cost of creating a new instance. This method should not be called if
1227    /// the dictionary tracker will be used to continue writing to an existing IPC stream.
1228    pub fn clear(&mut self) {
1229        self.dict_ids.clear();
1230        self.written.clear();
1231    }
1232}
1233
1234/// Describes how two dictionary arrays compare to each other.
1235#[derive(Debug, Clone)]
1236enum DictionaryComparison {
1237    /// Neither a delta, nor an exact match
1238    NotEqual,
1239    /// Exact element-wise match
1240    Equal,
1241    /// The two arrays are dictionary deltas of each other, meaning the first
1242    /// is a prefix of the second.
1243    Delta,
1244}
1245
1246// Compares two dictionaries and returns a [`DictionaryComparison`].
1247fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1248    // Check for exact match
1249    let existing_len = old.len();
1250    let new_len = new.len();
1251    if existing_len == new_len {
1252        if *old == *new {
1253            return DictionaryComparison::Equal;
1254        } else {
1255            return DictionaryComparison::NotEqual;
1256        }
1257    }
1258
1259    // Can't be a delta if the new is shorter than the existing
1260    if new_len < existing_len {
1261        return DictionaryComparison::NotEqual;
1262    }
1263
1264    // Check for delta
1265    if new.slice(0, existing_len) == *old {
1266        return DictionaryComparison::Delta;
1267    }
1268
1269    DictionaryComparison::NotEqual
1270}
1271
1272/// Arrow File Writer
1273///
1274/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1275///
1276/// # See Also
1277///
1278/// * [`StreamWriter`] for writing IPC Streams
1279///
1280/// # Example
1281/// ```
1282/// # use arrow_array::record_batch;
1283/// # use arrow_ipc::writer::FileWriter;
1284/// # let mut file = vec![]; // mimic a file for the example
1285/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1286/// // create a new writer, the schema must be known in advance
1287/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1288/// // write each batch to the underlying writer
1289/// writer.write(&batch).unwrap();
1290/// // When all batches are written, call finish to flush all buffers
1291/// writer.finish().unwrap();
1292/// ```
1293/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1294pub struct FileWriter<W> {
1295    /// The object to write to
1296    writer: W,
1297    /// IPC write options
1298    write_options: IpcWriteOptions,
1299    /// A reference to the schema, used in validating record batches
1300    schema: SchemaRef,
1301    /// The number of bytes between each block of bytes, as an offset for random access
1302    block_offsets: usize,
1303    /// Dictionary blocks that will be written as part of the IPC footer
1304    dictionary_blocks: Vec<crate::Block>,
1305    /// Record blocks that will be written as part of the IPC footer
1306    record_blocks: Vec<crate::Block>,
1307    /// Whether the writer footer has been written, and the writer is finished
1308    finished: bool,
1309    /// Keeps track of dictionaries that have been written
1310    dictionary_tracker: DictionaryTracker,
1311    /// User level customized metadata
1312    custom_metadata: HashMap<String, String>,
1313
1314    data_gen: IpcDataGenerator,
1315
1316    compression_context: CompressionContext,
1317}
1318
1319impl<W: Write> FileWriter<BufWriter<W>> {
1320    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1321    ///
1322    /// See [`FileWriter::try_new`] for an unbuffered version.
1323    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1324        Self::try_new(BufWriter::new(writer), schema)
1325    }
1326}
1327
1328impl<W: Write> FileWriter<W> {
1329    /// Try to create a new writer, with the schema written as part of the header
1330    ///
1331    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1332    ///
1333    /// # Errors
1334    ///
1335    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1336    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1337        let write_options = IpcWriteOptions::default();
1338        Self::try_new_with_options(writer, schema, write_options)
1339    }
1340
1341    /// Try to create a new writer with IpcWriteOptions
1342    ///
1343    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1344    ///
1345    /// # Errors
1346    ///
1347    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1348    pub fn try_new_with_options(
1349        mut writer: W,
1350        schema: &Schema,
1351        write_options: IpcWriteOptions,
1352    ) -> Result<Self, ArrowError> {
1353        let data_gen = IpcDataGenerator::default();
1354        // write magic to header aligned on alignment boundary
1355        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1356        let header_size = super::ARROW_MAGIC.len() + pad_len;
1357        writer.write_all(&super::ARROW_MAGIC)?;
1358        writer.write_all(&PADDING[..pad_len])?;
1359        // write the schema, set the written bytes to the schema + header
1360        let mut dictionary_tracker = DictionaryTracker::new(true);
1361        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1362            schema,
1363            &mut dictionary_tracker,
1364            &write_options,
1365        );
1366        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1367        Ok(Self {
1368            writer,
1369            write_options,
1370            schema: Arc::new(schema.clone()),
1371            block_offsets: meta + data + header_size,
1372            dictionary_blocks: vec![],
1373            record_blocks: vec![],
1374            finished: false,
1375            dictionary_tracker,
1376            custom_metadata: HashMap::new(),
1377            data_gen,
1378            compression_context: CompressionContext::default(),
1379        })
1380    }
1381
1382    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1383    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1384        self.custom_metadata.insert(key.into(), value.into());
1385    }
1386
1387    /// Write a record batch to the file
1388    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1389        if self.finished {
1390            return Err(ArrowError::IpcError(
1391                "Cannot write record batch to file writer as it is closed".to_string(),
1392            ));
1393        }
1394
1395        let meta = self.data_gen.write(
1396            batch,
1397            &mut self.dictionary_tracker,
1398            &self.write_options,
1399            &mut self.compression_context,
1400            &mut self.writer,
1401        )?;
1402
1403        for (header_len, body_len) in meta.dictionary_block_sizes {
1404            let block = crate::Block::new(
1405                self.block_offsets as i64,
1406                header_len as i32,
1407                body_len as i64,
1408            );
1409            self.dictionary_blocks.push(block);
1410            self.block_offsets += header_len + body_len;
1411        }
1412
1413        // add a record block for the footer
1414        let block = crate::Block::new(
1415            self.block_offsets as i64,
1416            meta.padded_header_len as i32,
1417            meta.body_len as i64,
1418        );
1419        self.record_blocks.push(block);
1420        self.block_offsets += meta.padded_header_len + meta.body_len;
1421        Ok(())
1422    }
1423
1424    /// Write footer and closing tag, then mark the writer as done
1425    pub fn finish(&mut self) -> Result<(), ArrowError> {
1426        if self.finished {
1427            return Err(ArrowError::IpcError(
1428                "Cannot write footer to file writer as it is closed".to_string(),
1429            ));
1430        }
1431
1432        // write EOS
1433        write_continuation(&mut self.writer, &self.write_options, 0)?;
1434
1435        let mut fbb = FlatBufferBuilder::new();
1436        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1437        let record_batches = fbb.create_vector(&self.record_blocks);
1438
1439        // dictionaries are already written, so we can reset dictionary tracker to reuse for schema
1440        self.dictionary_tracker.clear();
1441        let schema = IpcSchemaEncoder::new()
1442            .with_dictionary_tracker(&mut self.dictionary_tracker)
1443            .schema_to_fb_offset(&mut fbb, &self.schema);
1444        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1445            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1446
1447        let root = {
1448            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1449            footer_builder.add_version(self.write_options.metadata_version);
1450            footer_builder.add_schema(schema);
1451            footer_builder.add_dictionaries(dictionaries);
1452            footer_builder.add_recordBatches(record_batches);
1453            if let Some(fb_custom_metadata) = fb_custom_metadata {
1454                footer_builder.add_custom_metadata(fb_custom_metadata);
1455            }
1456            footer_builder.finish()
1457        };
1458        fbb.finish(root, None);
1459        let footer_data = fbb.finished_data();
1460        self.writer.write_all(footer_data)?;
1461        self.writer
1462            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1463        self.writer.write_all(&super::ARROW_MAGIC)?;
1464        self.writer.flush()?;
1465        self.finished = true;
1466
1467        Ok(())
1468    }
1469
1470    /// Returns the arrow [`SchemaRef`] for this arrow file.
1471    pub fn schema(&self) -> &SchemaRef {
1472        &self.schema
1473    }
1474
1475    /// Gets a reference to the underlying writer.
1476    pub fn get_ref(&self) -> &W {
1477        &self.writer
1478    }
1479
1480    /// Gets a mutable reference to the underlying writer.
1481    ///
1482    /// It is inadvisable to directly write to the underlying writer.
1483    pub fn get_mut(&mut self) -> &mut W {
1484        &mut self.writer
1485    }
1486
1487    /// Flush the underlying writer.
1488    ///
1489    /// Both the BufWriter and the underlying writer are flushed.
1490    pub fn flush(&mut self) -> Result<(), ArrowError> {
1491        self.writer.flush()?;
1492        Ok(())
1493    }
1494
1495    /// Unwraps the underlying writer.
1496    ///
1497    /// The writer is flushed and the FileWriter is finished before returning.
1498    ///
1499    /// # Errors
1500    ///
1501    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1502    /// or while flushing the writer.
1503    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1504        if !self.finished {
1505            // `finish` flushes the writer.
1506            self.finish()?;
1507        }
1508        Ok(self.writer)
1509    }
1510}
1511
1512impl<W: Write> RecordBatchWriter for FileWriter<W> {
1513    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1514        self.write(batch)
1515    }
1516
1517    fn close(mut self) -> Result<(), ArrowError> {
1518        self.finish()
1519    }
1520}
1521
1522/// Arrow Stream Writer
1523///
1524/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1525///
1526/// # See Also
1527///
1528/// * [`FileWriter`] for writing IPC Files
1529///
1530/// # Example - Basic usage
1531/// ```
1532/// # use arrow_array::record_batch;
1533/// # use arrow_ipc::writer::StreamWriter;
1534/// # let mut stream = vec![]; // mimic a stream for the example
1535/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1536/// // create a new writer, the schema must be known in advance
1537/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1538/// // write each batch to the underlying stream
1539/// writer.write(&batch).unwrap();
1540/// // When all batches are written, call finish to flush all buffers
1541/// writer.finish().unwrap();
1542/// ```
1543/// # Example - Efficient delta dictionaries
1544/// ```
1545/// # use arrow_array::record_batch;
1546/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1547/// # use arrow_ipc::writer::DictionaryHandling;
1548/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1549/// # use arrow_array::{
1550/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1551/// #    RecordBatch, StringArray,
1552/// # };
1553/// # use std::sync::Arc;
1554///
1555/// let schema = Arc::new(Schema::new(vec![Field::new(
1556///    "col1",
1557///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1558///    true,
1559/// )]));
1560///
1561/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1562///
1563/// // `finish_preserve_values` will keep the dictionary values along with their
1564/// // key assignments so that they can be re-used in the next batch.
1565/// builder.append("a").unwrap();
1566/// builder.append("b").unwrap();
1567/// let array1 = builder.finish_preserve_values();
1568/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1569///
1570/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1571/// // and 'd' will take the next available key.
1572/// builder.append("a").unwrap();
1573/// builder.append("d").unwrap();
1574/// let array2 = builder.finish_preserve_values();
1575/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1576///
1577/// let mut stream = vec![];
1578/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1579/// // enable delta dictionaries in the writer
1580/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1581/// let mut writer = StreamWriter::try_new_with_options(&mut stream, &schema, options).unwrap();
1582///
1583/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1584/// // prior to the record batch.
1585/// writer.write(&batch1).unwrap();
1586/// // With the second batch only a delta dictionary with 'd' will be written
1587/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1588/// // Without it, 'a' and 'd' in this batch would have different keys than the
1589/// // first batch and so we'd have to send a replacement dictionary with new keys
1590/// // for both.
1591/// writer.write(&batch2).unwrap();
1592/// writer.finish().unwrap();
1593/// ```
1594/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1595pub struct StreamWriter<W> {
1596    /// The object to write to
1597    writer: W,
1598    /// IPC write options
1599    write_options: IpcWriteOptions,
1600    /// Whether the writer footer has been written, and the writer is finished
1601    finished: bool,
1602    /// Keeps track of dictionaries that have been written
1603    dictionary_tracker: DictionaryTracker,
1604
1605    data_gen: IpcDataGenerator,
1606
1607    compression_context: CompressionContext,
1608}
1609
1610impl<W: Write> StreamWriter<BufWriter<W>> {
1611    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1612    ///
1613    /// See [`StreamWriter::try_new`] for an unbuffered version.
1614    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1615        Self::try_new(BufWriter::new(writer), schema)
1616    }
1617}
1618
1619impl<W: Write> StreamWriter<W> {
1620    /// Try to create a new writer, with the schema written as part of the header.
1621    ///
1622    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1623    ///
1624    /// # Errors
1625    ///
1626    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1627    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1628        let write_options = IpcWriteOptions::default();
1629        Self::try_new_with_options(writer, schema, write_options)
1630    }
1631
1632    /// Try to create a new writer with [`IpcWriteOptions`].
1633    ///
1634    /// # Errors
1635    ///
1636    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1637    pub fn try_new_with_options(
1638        mut writer: W,
1639        schema: &Schema,
1640        write_options: IpcWriteOptions,
1641    ) -> Result<Self, ArrowError> {
1642        let data_gen = IpcDataGenerator::default();
1643        let mut dictionary_tracker = DictionaryTracker::new(false);
1644
1645        // write the schema, set the written bytes to the schema
1646        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1647            schema,
1648            &mut dictionary_tracker,
1649            &write_options,
1650        );
1651        write_message(&mut writer, encoded_message, &write_options)?;
1652        Ok(Self {
1653            writer,
1654            write_options,
1655            finished: false,
1656            dictionary_tracker,
1657            data_gen,
1658            compression_context: CompressionContext::default(),
1659        })
1660    }
1661
1662    /// Write a record batch to the stream
1663    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1664        if self.finished {
1665            return Err(ArrowError::IpcError(
1666                "Cannot write record batch to stream writer as it is closed".to_string(),
1667            ));
1668        }
1669
1670        self.data_gen.write(
1671            batch,
1672            &mut self.dictionary_tracker,
1673            &self.write_options,
1674            &mut self.compression_context,
1675            &mut self.writer,
1676        )?;
1677        Ok(())
1678    }
1679
1680    /// Write continuation bytes, and mark the stream as done
1681    pub fn finish(&mut self) -> Result<(), ArrowError> {
1682        if self.finished {
1683            return Err(ArrowError::IpcError(
1684                "Cannot write footer to stream writer as it is closed".to_string(),
1685            ));
1686        }
1687
1688        write_continuation(&mut self.writer, &self.write_options, 0)?;
1689        self.writer.flush()?;
1690
1691        self.finished = true;
1692
1693        Ok(())
1694    }
1695
1696    /// Gets a reference to the underlying writer.
1697    pub fn get_ref(&self) -> &W {
1698        &self.writer
1699    }
1700
1701    /// Gets a mutable reference to the underlying writer.
1702    ///
1703    /// It is inadvisable to directly write to the underlying writer.
1704    pub fn get_mut(&mut self) -> &mut W {
1705        &mut self.writer
1706    }
1707
1708    /// Flush the underlying writer.
1709    ///
1710    /// Both the BufWriter and the underlying writer are flushed.
1711    pub fn flush(&mut self) -> Result<(), ArrowError> {
1712        self.writer.flush()?;
1713        Ok(())
1714    }
1715
1716    /// Unwraps the the underlying writer.
1717    ///
1718    /// The writer is flushed and the StreamWriter is finished before returning.
1719    ///
1720    /// # Errors
1721    ///
1722    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1723    /// or while flushing the writer.
1724    ///
1725    /// # Example
1726    ///
1727    /// ```
1728    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1729    /// # use arrow_ipc::MetadataVersion;
1730    /// # use arrow_schema::{ArrowError, Schema};
1731    /// # fn main() -> Result<(), ArrowError> {
1732    /// // The result we expect from an empty schema
1733    /// let expected = vec![
1734    ///     255, 255, 255, 255,  48,   0,   0,   0,
1735    ///      16,   0,   0,   0,   0,   0,  10,   0,
1736    ///      12,   0,  10,   0,   9,   0,   4,   0,
1737    ///      10,   0,   0,   0,  16,   0,   0,   0,
1738    ///       0,   1,   4,   0,   8,   0,   8,   0,
1739    ///       0,   0,   4,   0,   8,   0,   0,   0,
1740    ///       4,   0,   0,   0,   0,   0,   0,   0,
1741    ///     255, 255, 255, 255,   0,   0,   0,   0
1742    /// ];
1743    ///
1744    /// let schema = Schema::empty();
1745    /// let buffer: Vec<u8> = Vec::new();
1746    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1747    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1748    ///
1749    /// assert_eq!(stream_writer.into_inner()?, expected);
1750    /// # Ok(())
1751    /// # }
1752    /// ```
1753    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1754        if !self.finished {
1755            // `finish` flushes.
1756            self.finish()?;
1757        }
1758        Ok(self.writer)
1759    }
1760}
1761
1762impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1763    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1764        self.write(batch)
1765    }
1766
1767    fn close(mut self) -> Result<(), ArrowError> {
1768        self.finish()
1769    }
1770}
1771
1772/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1773pub struct EncodedData {
1774    /// An encoded crate::Message
1775    pub ipc_message: Vec<u8>,
1776    /// Arrow buffers to be written, should be an empty vec for schema messages
1777    pub arrow_data: Vec<u8>,
1778}
1779/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1780pub fn write_message<W: Write>(
1781    mut writer: W,
1782    encoded: EncodedData,
1783    write_options: &IpcWriteOptions,
1784) -> Result<(usize, usize), ArrowError> {
1785    let arrow_data_len = encoded.arrow_data.len();
1786    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1787        return Err(ArrowError::MemoryError(
1788            "Arrow data not aligned".to_string(),
1789        ));
1790    }
1791
1792    let a = usize::from(write_options.alignment - 1);
1793    let buffer = encoded.ipc_message;
1794    let flatbuf_size = buffer.len();
1795    let prefix_size = if write_options.write_legacy_ipc_format {
1796        4
1797    } else {
1798        8
1799    };
1800    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1801    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1802
1803    write_continuation(
1804        &mut writer,
1805        write_options,
1806        (aligned_size - prefix_size) as i32,
1807    )?;
1808
1809    // write the flatbuf
1810    if flatbuf_size > 0 {
1811        writer.write_all(&buffer)?;
1812    }
1813    // write padding
1814    writer.write_all(&PADDING[..padding_bytes])?;
1815
1816    // write arrow data
1817    let body_len = if arrow_data_len > 0 {
1818        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1819    } else {
1820        0
1821    };
1822
1823    Ok((aligned_size, body_len))
1824}
1825
1826fn write_body_buffers<W: Write>(
1827    mut writer: W,
1828    data: &[u8],
1829    alignment: u8,
1830) -> Result<usize, ArrowError> {
1831    let len = data.len();
1832    let pad_len = pad_to_alignment(alignment, len);
1833    let total_len = len + pad_len;
1834
1835    // write body buffer
1836    writer.write_all(data)?;
1837    if pad_len > 0 {
1838        writer.write_all(&PADDING[..pad_len])?;
1839    }
1840
1841    Ok(total_len)
1842}
1843
1844/// Write a record batch to the writer, writing the message size before the message
1845/// if the record batch is being written to a stream
1846fn write_continuation<W: Write>(
1847    mut writer: W,
1848    write_options: &IpcWriteOptions,
1849    total_len: i32,
1850) -> Result<usize, ArrowError> {
1851    let mut written = 8;
1852
1853    // the version of the writer determines whether continuation markers should be added
1854    match write_options.metadata_version {
1855        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1856            unreachable!("Options with the metadata version cannot be created")
1857        }
1858        crate::MetadataVersion::V4 => {
1859            if !write_options.write_legacy_ipc_format {
1860                // v0.15.0 format
1861                writer.write_all(&CONTINUATION_MARKER)?;
1862                written = 4;
1863            }
1864            writer.write_all(&total_len.to_le_bytes()[..])?;
1865        }
1866        crate::MetadataVersion::V5 => {
1867            // write continuation marker and message length
1868            writer.write_all(&CONTINUATION_MARKER)?;
1869            writer.write_all(&total_len.to_le_bytes()[..])?;
1870        }
1871        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1872    };
1873
1874    Ok(written)
1875}
1876
1877/// In V4, null types have no validity bitmap
1878/// In V5 and later, null and union types have no validity bitmap
1879/// Run end encoded type has no validity bitmap.
1880fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1881    if write_options.metadata_version < crate::MetadataVersion::V5 {
1882        !matches!(data_type, DataType::Null)
1883    } else {
1884        !matches!(
1885            data_type,
1886            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1887        )
1888    }
1889}
1890
1891/// Whether to truncate the buffer
1892#[inline]
1893fn buffer_need_truncate(
1894    array_offset: usize,
1895    buffer: &Buffer,
1896    spec: &BufferSpec,
1897    min_length: usize,
1898) -> bool {
1899    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1900}
1901
1902/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1903#[inline]
1904fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1905    match spec {
1906        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1907        _ => 0,
1908    }
1909}
1910
1911/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1912/// original start offset and length for use in slicing child data.
1913fn reencode_offsets<O: OffsetSizeTrait>(
1914    offsets: &Buffer,
1915    data: &ArrayData,
1916) -> (Buffer, usize, usize) {
1917    let offsets_slice: &[O] = offsets.typed_data::<O>();
1918    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1919
1920    let start_offset = offset_slice.first().unwrap();
1921    let end_offset = offset_slice.last().unwrap();
1922
1923    let offsets = match start_offset.as_usize() {
1924        0 => {
1925            let size = size_of::<O>();
1926            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1927        }
1928        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1929    };
1930
1931    let start_offset = start_offset.as_usize();
1932    let end_offset = end_offset.as_usize();
1933
1934    (offsets, start_offset, end_offset - start_offset)
1935}
1936
1937/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1938///
1939/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1940/// slicing the values buffer as appropriate. This helps reduce the encoded
1941/// size of sliced arrays, as values that have been sliced away are not encoded
1942fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1943    if data.is_empty() {
1944        // As per specification, offsets buffer has N+1 elements.
1945        // So an empty array should still be encoded with a single 0 offset.
1946        let mut offsets = MutableBuffer::new(size_of::<O>());
1947        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1948        return (offsets.into(), MutableBuffer::new(0).into());
1949    }
1950
1951    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1952    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1953    (offsets, values)
1954}
1955
1956/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1957/// of a values buffer.
1958fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1959    if data.is_empty() {
1960        // As per specification, offsets buffer has N+1 elements.
1961        // So an empty array should still be encoded with a single 0 offset.
1962        let mut offsets = MutableBuffer::new(size_of::<O>());
1963        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1964        return (offsets.into(), data.child_data()[0].slice(0, 0));
1965    }
1966
1967    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1968    let child_data = data.child_data()[0].slice(original_start_offset, len);
1969    (offsets, child_data)
1970}
1971
1972/// Returns the offsets, sizes, and child data buffers for a ListView array.
1973///
1974/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
1975/// and offsets can be non-monotonic. When slicing, we simply pass through the
1976/// offsets and sizes without re-encoding, and do not slice the child data.
1977fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1978    data: &ArrayData,
1979) -> (Buffer, Buffer, ArrayData) {
1980    if data.is_empty() {
1981        return (
1982            MutableBuffer::new(0).into(),
1983            MutableBuffer::new(0).into(),
1984            data.child_data()[0].slice(0, 0),
1985        );
1986    }
1987
1988    let offsets = &data.buffers()[0];
1989    let sizes = &data.buffers()[1];
1990
1991    let element_size = std::mem::size_of::<O>();
1992    let offsets_slice =
1993        offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1994    let sizes_slice =
1995        sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1996
1997    let child_data = data.child_data()[0].clone();
1998
1999    (offsets_slice, sizes_slice, child_data)
2000}
2001
2002/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
2003///
2004/// The views buffer is sliced to only include views in the valid range based on
2005/// the array's offset and length. This helps reduce the encoded size of sliced
2006/// arrays
2007///
2008fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
2009    let buffer = &array_data.buffers()[0];
2010    let layout = layout(array_data.data_type());
2011    let spec = &layout.buffers[0];
2012
2013    let byte_width = get_buffer_element_width(spec);
2014    let min_length = array_data.len() * byte_width;
2015    if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
2016        let byte_offset = array_data.offset() * byte_width;
2017        let buffer_length = min(min_length, buffer.len() - byte_offset);
2018        buffer.slice_with_length(byte_offset, buffer_length)
2019    } else {
2020        buffer.clone()
2021    }
2022}
2023
2024/// Recursively encodes `array_data` into its IPC representation.
2025///
2026/// Output goes to two separate channels:
2027/// - `meta`: accumulates IPC metadata (`nodes` and `buffers`) for the flatbuffer header.
2028/// - `sink`: the raw Arrow data bytes that form the IPC message body.
2029fn write_array_data(
2030    array_data: &ArrayData,
2031    meta: &mut IpcMetadataBuilder,
2032    sink: &mut IpcBodySink<'_>,
2033    offset: i64,
2034    compression_codec: Option<CompressionCodec>,
2035    compression_context: &mut CompressionContext,
2036    write_options: &IpcWriteOptions,
2037) -> Result<i64, ArrowError> {
2038    let mut offset = offset;
2039    let num_rows = array_data.len();
2040    if !matches!(array_data.data_type(), DataType::Null) {
2041        meta.nodes.push(crate::FieldNode::new(
2042            num_rows as i64,
2043            array_data.null_count() as i64,
2044        ));
2045    } else {
2046        // NullArray's null_count equals to len, but ArrayData null_count is always 0.
2047        meta.nodes
2048            .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
2049    }
2050    if has_validity_bitmap(array_data.data_type(), write_options) {
2051        // write null buffer if exists
2052        let null_buffer = match array_data.nulls() {
2053            None => {
2054                // create a buffer and fill it with valid bits
2055                let num_bytes = bit_util::ceil(num_rows, 8);
2056                let buffer = MutableBuffer::new(num_bytes);
2057                let buffer = buffer.with_bitset(num_bytes, true);
2058                buffer.into()
2059            }
2060            Some(buffer) => buffer.inner().sliced(),
2061        };
2062
2063        offset = encode_sink_buffer(
2064            null_buffer,
2065            meta,
2066            sink,
2067            offset,
2068            compression_codec,
2069            compression_context,
2070            write_options.alignment,
2071        )?;
2072    }
2073
2074    let data_type = array_data.data_type();
2075    if matches!(data_type, DataType::Binary | DataType::Utf8) {
2076        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2077        for buffer in [offsets, values] {
2078            offset = encode_sink_buffer(
2079                buffer,
2080                meta,
2081                sink,
2082                offset,
2083                compression_codec,
2084                compression_context,
2085                write_options.alignment,
2086            )?;
2087        }
2088    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2089        // Slicing the views buffer is safe and easy,
2090        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
2091        //
2092        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
2093        // If users wants to "compact" the arrays prior to sending them over IPC,
2094        // they should consider the gc API suggested in #5513
2095        let views = get_or_truncate_buffer(array_data);
2096        offset = encode_sink_buffer(
2097            views,
2098            meta,
2099            sink,
2100            offset,
2101            compression_codec,
2102            compression_context,
2103            write_options.alignment,
2104        )?;
2105
2106        for buffer in array_data.buffers().iter().skip(1) {
2107            offset = encode_sink_buffer(
2108                buffer.clone(),
2109                meta,
2110                sink,
2111                offset,
2112                compression_codec,
2113                compression_context,
2114                write_options.alignment,
2115            )?;
2116        }
2117    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2118        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2119        for buffer in [offsets, values] {
2120            offset = encode_sink_buffer(
2121                buffer,
2122                meta,
2123                sink,
2124                offset,
2125                compression_codec,
2126                compression_context,
2127                write_options.alignment,
2128            )?;
2129        }
2130    } else if DataType::is_numeric(data_type)
2131        || DataType::is_temporal(data_type)
2132        || matches!(
2133            array_data.data_type(),
2134            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2135        )
2136    {
2137        // Truncate values
2138        assert_eq!(array_data.buffers().len(), 1);
2139
2140        let buffer = get_or_truncate_buffer(array_data);
2141        offset = encode_sink_buffer(
2142            buffer,
2143            meta,
2144            sink,
2145            offset,
2146            compression_codec,
2147            compression_context,
2148            write_options.alignment,
2149        )?;
2150    } else if matches!(data_type, DataType::Boolean) {
2151        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
2152        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
2153        assert_eq!(array_data.buffers().len(), 1);
2154
2155        let buffer = &array_data.buffers()[0];
2156        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2157        offset = encode_sink_buffer(
2158            buffer,
2159            meta,
2160            sink,
2161            offset,
2162            compression_codec,
2163            compression_context,
2164            write_options.alignment,
2165        )?;
2166    } else if matches!(
2167        data_type,
2168        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2169    ) {
2170        assert_eq!(array_data.buffers().len(), 1);
2171        assert_eq!(array_data.child_data().len(), 1);
2172
2173        // Truncate offsets and the child data to avoid writing unnecessary data
2174        let (offsets, sliced_child_data) = match data_type {
2175            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2176            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2177            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2178            _ => unreachable!(),
2179        };
2180        offset = encode_sink_buffer(
2181            offsets,
2182            meta,
2183            sink,
2184            offset,
2185            compression_codec,
2186            compression_context,
2187            write_options.alignment,
2188        )?;
2189        offset = write_array_data(
2190            &sliced_child_data,
2191            meta,
2192            sink,
2193            offset,
2194            compression_codec,
2195            compression_context,
2196            write_options,
2197        )?;
2198        return Ok(offset);
2199    } else if matches!(
2200        data_type,
2201        DataType::ListView(_) | DataType::LargeListView(_)
2202    ) {
2203        assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
2204        assert_eq!(array_data.child_data().len(), 1);
2205
2206        let (offsets, sizes, child_data) = match data_type {
2207            DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2208            DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2209            _ => unreachable!(),
2210        };
2211
2212        offset = encode_sink_buffer(
2213            offsets,
2214            meta,
2215            sink,
2216            offset,
2217            compression_codec,
2218            compression_context,
2219            write_options.alignment,
2220        )?;
2221        offset = encode_sink_buffer(
2222            sizes,
2223            meta,
2224            sink,
2225            offset,
2226            compression_codec,
2227            compression_context,
2228            write_options.alignment,
2229        )?;
2230
2231        offset = write_array_data(
2232            &child_data,
2233            meta,
2234            sink,
2235            offset,
2236            compression_codec,
2237            compression_context,
2238            write_options,
2239        )?;
2240        return Ok(offset);
2241    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2242        assert_eq!(array_data.child_data().len(), 1);
2243        let fixed_size = *fixed_size as usize;
2244
2245        let child_offset = array_data.offset() * fixed_size;
2246        let child_length = array_data.len() * fixed_size;
2247        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2248
2249        offset = write_array_data(
2250            &child_data,
2251            meta,
2252            sink,
2253            offset,
2254            compression_codec,
2255            compression_context,
2256            write_options,
2257        )?;
2258        return Ok(offset);
2259    } else {
2260        for buffer in array_data.buffers() {
2261            offset = encode_sink_buffer(
2262                buffer.clone(),
2263                meta,
2264                sink,
2265                offset,
2266                compression_codec,
2267                compression_context,
2268                write_options.alignment,
2269            )?;
2270        }
2271    }
2272
2273    match array_data.data_type() {
2274        DataType::Dictionary(_, _) => {}
2275        DataType::RunEndEncoded(_, _) => {
2276            // unslice the run encoded array.
2277            let arr = unslice_run_array(array_data.clone())?;
2278            // recursively write out nested structures
2279            for data_ref in arr.child_data() {
2280                // write the nested data (e.g list data)
2281                offset = write_array_data(
2282                    data_ref,
2283                    meta,
2284                    sink,
2285                    offset,
2286                    compression_codec,
2287                    compression_context,
2288                    write_options,
2289                )?;
2290            }
2291        }
2292        _ => {
2293            // recursively write out nested structures
2294            for data_ref in array_data.child_data() {
2295                // write the nested data (e.g list data)
2296                offset = write_array_data(
2297                    data_ref,
2298                    meta,
2299                    sink,
2300                    offset,
2301                    compression_codec,
2302                    compression_context,
2303                    write_options,
2304                )?;
2305            }
2306        }
2307    }
2308    Ok(offset)
2309}
2310
2311/// Encodes a single Arrow [`Buffer`] into the IPC body and records its metadata.
2312///
2313/// - `buffer`: the Arrow data buffer to encode (validity bitmap, offsets, values, etc.)
2314/// - `buffers`: in-progress list of IPC `Buffer` metadata entries (body offset + length) that
2315///   will eventually be serialised into the flatbuffer `RecordBatch` header.
2316/// - `sink`: destination for the actual encoded bytes; either a contiguous `Vec<u8>` for
2317///   in-memory writes, or a list of [`EncodedBuffer`] segments for deferred zero-copy streaming.
2318/// - `offset`: running byte offset into the IPC message body, used to compute the metadata entry.
2319/// - `compression_codec` / `compression_context`: if `Some`, the buffer is compressed before
2320///   writing; `compression_context` provides reusable scratch space across calls.
2321/// - `alignment`: each buffer is padded to this many bytes so the next buffer starts aligned.
2322///
2323/// Returns the updated `offset` (advanced by the encoded length plus any alignment padding).
2324fn encode_sink_buffer(
2325    buffer: Buffer,
2326    ipc_meta_data: &mut IpcMetadataBuilder,
2327    sink: &mut IpcBodySink<'_>,
2328    offset: i64,
2329    compression_codec: Option<CompressionCodec>,
2330    compression_context: &mut CompressionContext,
2331    alignment: u8,
2332) -> Result<i64, ArrowError> {
2333    let (encoded, len) = match compression_codec {
2334        None => {
2335            let len = buffer.len() as i64;
2336            (EncodedBuffer::Raw(buffer), len)
2337        }
2338        Some(codec) => {
2339            let mut scratch = Vec::new();
2340            let written =
2341                codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?;
2342            let len = i64::try_from(written)
2343                .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2344            (EncodedBuffer::Compressed(scratch), len)
2345        }
2346    };
2347
2348    let pad_len = pad_to_alignment(alignment, len as usize);
2349    sink.write(pad_len, encoded);
2350    ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2351    Ok(offset + len + pad_len as i64)
2352}
2353
2354const PADDING: [u8; 64] = [0; 64];
2355
2356/// Estimates the number of [`EncodedBuffer`] segments that [`write_array_data`]
2357/// will produce for a column of the given type.
2358///
2359/// Based on the Arrow IPC buffer layout
2360/// (<https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message>):
2361#[inline]
2362fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2363    match dt {
2364        DataType::Null => 0,
2365
2366        DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2367
2368        DataType::BinaryView | DataType::Utf8View => 3,
2369
2370        DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2371            2 + estimate_encoded_buffer_count(f.data_type())
2372        }
2373
2374        DataType::ListView(f) | DataType::LargeListView(f) => {
2375            3 + estimate_encoded_buffer_count(f.data_type())
2376        }
2377
2378        DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2379
2380        DataType::Struct(fields) => {
2381            1 + fields
2382                .iter()
2383                .map(|f| estimate_encoded_buffer_count(f.data_type()))
2384                .sum::<usize>()
2385        }
2386
2387        // Dictionary indices only; dictionary body is a separate IPC message.
2388        DataType::Dictionary(_, _) => 2,
2389
2390        DataType::Union(fields, UnionMode::Sparse) => {
2391            1 + fields
2392                .iter()
2393                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2394                .sum::<usize>()
2395        }
2396        DataType::Union(fields, UnionMode::Dense) => {
2397            2 + fields
2398                .iter()
2399                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2400                .sum::<usize>()
2401        }
2402
2403        DataType::RunEndEncoded(run_ends, values) => {
2404            estimate_encoded_buffer_count(run_ends.data_type())
2405                + estimate_encoded_buffer_count(values.data_type())
2406        }
2407        // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity + values.
2408        _ => 2,
2409    }
2410}
2411
2412/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2413#[inline]
2414fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2415    let a = usize::from(alignment - 1);
2416    ((len + a) & !a) - len
2417}
2418
2419#[cfg(test)]
2420mod tests {
2421    use std::hash::Hasher;
2422    use std::io::Cursor;
2423    use std::io::Seek;
2424
2425    use arrow_array::builder::FixedSizeListBuilder;
2426    use arrow_array::builder::Float32Builder;
2427    use arrow_array::builder::Int64Builder;
2428    use arrow_array::builder::MapBuilder;
2429    use arrow_array::builder::StringViewBuilder;
2430    use arrow_array::builder::UnionBuilder;
2431    use arrow_array::builder::{
2432        GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2433    };
2434    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2435    use arrow_array::types::*;
2436    use arrow_buffer::ScalarBuffer;
2437
2438    use crate::MetadataVersion;
2439    use crate::convert::fb_to_schema;
2440    use crate::reader::*;
2441    use crate::root_as_footer;
2442
2443    use super::*;
2444
2445    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2446        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2447        writer.write(rb).unwrap();
2448        writer.finish().unwrap();
2449        writer.into_inner().unwrap()
2450    }
2451
2452    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2453        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2454        reader.next().unwrap().unwrap()
2455    }
2456
2457    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2458        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2459        // without needing to construct a giant array to spill over the 64-byte default alignment
2460        // boundary.
2461        const IPC_ALIGNMENT: usize = 8;
2462
2463        let mut stream_writer = StreamWriter::try_new_with_options(
2464            vec![],
2465            record.schema_ref(),
2466            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2467        )
2468        .unwrap();
2469        stream_writer.write(record).unwrap();
2470        stream_writer.finish().unwrap();
2471        stream_writer.into_inner().unwrap()
2472    }
2473
2474    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2475        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2476        stream_reader.next().unwrap().unwrap()
2477    }
2478
2479    #[test]
2480    #[cfg(feature = "lz4")]
2481    fn test_write_empty_record_batch_lz4_compression() {
2482        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2483        let values: Vec<Option<i32>> = vec![];
2484        let array = Int32Array::from(values);
2485        let record_batch =
2486            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2487
2488        let mut file = tempfile::tempfile().unwrap();
2489
2490        {
2491            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2492                .unwrap()
2493                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2494                .unwrap();
2495
2496            let mut writer =
2497                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2498            writer.write(&record_batch).unwrap();
2499            writer.finish().unwrap();
2500        }
2501        file.rewind().unwrap();
2502        {
2503            // read file
2504            let reader = FileReader::try_new(file, None).unwrap();
2505            for read_batch in reader {
2506                read_batch
2507                    .unwrap()
2508                    .columns()
2509                    .iter()
2510                    .zip(record_batch.columns())
2511                    .for_each(|(a, b)| {
2512                        assert_eq!(a.data_type(), b.data_type());
2513                        assert_eq!(a.len(), b.len());
2514                        assert_eq!(a.null_count(), b.null_count());
2515                    });
2516            }
2517        }
2518    }
2519
2520    #[test]
2521    #[cfg(feature = "lz4")]
2522    fn test_write_file_with_lz4_compression() {
2523        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2524        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2525        let array = Int32Array::from(values);
2526        let record_batch =
2527            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2528
2529        let mut file = tempfile::tempfile().unwrap();
2530        {
2531            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2532                .unwrap()
2533                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2534                .unwrap();
2535
2536            let mut writer =
2537                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2538            writer.write(&record_batch).unwrap();
2539            writer.finish().unwrap();
2540        }
2541        file.rewind().unwrap();
2542        {
2543            // read file
2544            let reader = FileReader::try_new(file, None).unwrap();
2545            for read_batch in reader {
2546                read_batch
2547                    .unwrap()
2548                    .columns()
2549                    .iter()
2550                    .zip(record_batch.columns())
2551                    .for_each(|(a, b)| {
2552                        assert_eq!(a.data_type(), b.data_type());
2553                        assert_eq!(a.len(), b.len());
2554                        assert_eq!(a.null_count(), b.null_count());
2555                    });
2556            }
2557        }
2558    }
2559
2560    #[test]
2561    #[cfg(feature = "zstd")]
2562    fn test_write_file_with_zstd_compression() {
2563        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2564        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2565        let array = Int32Array::from(values);
2566        let record_batch =
2567            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2568        let mut file = tempfile::tempfile().unwrap();
2569        {
2570            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2571                .unwrap()
2572                .try_with_compression(Some(crate::CompressionType::ZSTD))
2573                .unwrap()
2574                .try_with_compression_level(Some(1))
2575                .unwrap();
2576
2577            let mut writer =
2578                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2579            writer.write(&record_batch).unwrap();
2580            writer.finish().unwrap();
2581        }
2582        file.rewind().unwrap();
2583        {
2584            // read file
2585            let reader = FileReader::try_new(file, None).unwrap();
2586            for read_batch in reader {
2587                read_batch
2588                    .unwrap()
2589                    .columns()
2590                    .iter()
2591                    .zip(record_batch.columns())
2592                    .for_each(|(a, b)| {
2593                        assert_eq!(a.data_type(), b.data_type());
2594                        assert_eq!(a.len(), b.len());
2595                        assert_eq!(a.null_count(), b.null_count());
2596                    });
2597            }
2598        }
2599    }
2600
2601    #[test]
2602    fn test_write_file() {
2603        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2604        let values: Vec<Option<u32>> = vec![
2605            Some(999),
2606            None,
2607            Some(235),
2608            Some(123),
2609            None,
2610            None,
2611            None,
2612            None,
2613            None,
2614        ];
2615        let array1 = UInt32Array::from(values);
2616        let batch =
2617            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2618                .unwrap();
2619        let mut file = tempfile::tempfile().unwrap();
2620        {
2621            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2622
2623            writer.write(&batch).unwrap();
2624            writer.finish().unwrap();
2625        }
2626        file.rewind().unwrap();
2627
2628        {
2629            let mut reader = FileReader::try_new(file, None).unwrap();
2630            while let Some(Ok(read_batch)) = reader.next() {
2631                read_batch
2632                    .columns()
2633                    .iter()
2634                    .zip(batch.columns())
2635                    .for_each(|(a, b)| {
2636                        assert_eq!(a.data_type(), b.data_type());
2637                        assert_eq!(a.len(), b.len());
2638                        assert_eq!(a.null_count(), b.null_count());
2639                    });
2640            }
2641        }
2642    }
2643
2644    #[test]
2645    fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2646        let name = StringArray::from(Vec::<String>::new());
2647        let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2648
2649        assert_eq!(name.len(), 0);
2650        assert_eq!(
2651            offsets.len(),
2652            std::mem::size_of::<i32>(),
2653            "offsets buffer should contain one zero i32 offset"
2654        );
2655        assert_eq!(values.len(), 0, "values buffer should remain empty");
2656    }
2657
2658    #[test]
2659    fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2660        let name = LargeStringArray::from(Vec::<String>::new());
2661        let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2662
2663        assert_eq!(name.len(), 0);
2664        assert_eq!(
2665            offsets.len(),
2666            std::mem::size_of::<i64>(),
2667            "offsets buffer should contain one zero i64 offset"
2668        );
2669        assert_eq!(values.len(), 0, "values buffer should remain empty");
2670    }
2671
2672    #[test]
2673    fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2674        let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2675        let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2676
2677        assert_eq!(list.len(), 0);
2678        assert_eq!(
2679            offsets.len(),
2680            std::mem::size_of::<i32>(),
2681            "offsets buffer should contain one zero i32 offset"
2682        );
2683        assert_eq!(child_data.len(), 0, "child data should remain empty");
2684    }
2685
2686    #[test]
2687    fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2688        let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2689        let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2690
2691        assert_eq!(list.len(), 0);
2692        assert_eq!(
2693            offsets.len(),
2694            std::mem::size_of::<i64>(),
2695            "offsets buffer should contain one zero i64 offset"
2696        );
2697        assert_eq!(child_data.len(), 0, "child data should remain empty");
2698    }
2699
2700    fn write_null_file(options: IpcWriteOptions) {
2701        let schema = Schema::new(vec![
2702            Field::new("nulls", DataType::Null, true),
2703            Field::new("int32s", DataType::Int32, false),
2704            Field::new("nulls2", DataType::Null, true),
2705            Field::new("f64s", DataType::Float64, false),
2706        ]);
2707        let array1 = NullArray::new(32);
2708        let array2 = Int32Array::from(vec![1; 32]);
2709        let array3 = NullArray::new(32);
2710        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2711        let batch = RecordBatch::try_new(
2712            Arc::new(schema.clone()),
2713            vec![
2714                Arc::new(array1) as ArrayRef,
2715                Arc::new(array2) as ArrayRef,
2716                Arc::new(array3) as ArrayRef,
2717                Arc::new(array4) as ArrayRef,
2718            ],
2719        )
2720        .unwrap();
2721        let mut file = tempfile::tempfile().unwrap();
2722        {
2723            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2724
2725            writer.write(&batch).unwrap();
2726            writer.finish().unwrap();
2727        }
2728
2729        file.rewind().unwrap();
2730
2731        {
2732            let reader = FileReader::try_new(file, None).unwrap();
2733            reader.for_each(|maybe_batch| {
2734                maybe_batch
2735                    .unwrap()
2736                    .columns()
2737                    .iter()
2738                    .zip(batch.columns())
2739                    .for_each(|(a, b)| {
2740                        assert_eq!(a.data_type(), b.data_type());
2741                        assert_eq!(a.len(), b.len());
2742                        assert_eq!(a.null_count(), b.null_count());
2743                    });
2744            });
2745        }
2746    }
2747    #[test]
2748    fn test_write_null_file_v4() {
2749        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2750        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2751        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2752        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2753    }
2754
2755    #[test]
2756    fn test_write_null_file_v5() {
2757        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2758        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2759    }
2760
2761    #[test]
2762    fn track_union_nested_dict() {
2763        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2764
2765        let array = Arc::new(inner) as ArrayRef;
2766
2767        // Dict field with id 2
2768        #[allow(deprecated)]
2769        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2770        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2771
2772        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2773        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2774
2775        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2776
2777        let schema = Arc::new(Schema::new(vec![Field::new(
2778            "union",
2779            union.data_type().clone(),
2780            false,
2781        )]));
2782
2783        let r#gen = IpcDataGenerator::default();
2784        let mut dict_tracker = DictionaryTracker::new(false);
2785        r#gen.schema_to_bytes_with_dictionary_tracker(
2786            &schema,
2787            &mut dict_tracker,
2788            &IpcWriteOptions::default(),
2789        );
2790
2791        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2792
2793        r#gen
2794            .encode(
2795                &batch,
2796                &mut dict_tracker,
2797                &Default::default(),
2798                &mut Default::default(),
2799            )
2800            .unwrap();
2801
2802        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2803        // so we expect the dict will be keyed to 0
2804        assert!(dict_tracker.written.contains_key(&0));
2805    }
2806
2807    #[test]
2808    fn track_struct_nested_dict() {
2809        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2810
2811        let array = Arc::new(inner) as ArrayRef;
2812
2813        // Dict field with id 2
2814        #[allow(deprecated)]
2815        let dctfield = Arc::new(Field::new_dict(
2816            "dict",
2817            array.data_type().clone(),
2818            false,
2819            2,
2820            false,
2821        ));
2822
2823        let s = StructArray::from(vec![(dctfield, array)]);
2824        let struct_array = Arc::new(s) as ArrayRef;
2825
2826        let schema = Arc::new(Schema::new(vec![Field::new(
2827            "struct",
2828            struct_array.data_type().clone(),
2829            false,
2830        )]));
2831
2832        let r#gen = IpcDataGenerator::default();
2833        let mut dict_tracker = DictionaryTracker::new(false);
2834        r#gen.schema_to_bytes_with_dictionary_tracker(
2835            &schema,
2836            &mut dict_tracker,
2837            &IpcWriteOptions::default(),
2838        );
2839
2840        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2841
2842        r#gen
2843            .encode(
2844                &batch,
2845                &mut dict_tracker,
2846                &Default::default(),
2847                &mut Default::default(),
2848            )
2849            .unwrap();
2850
2851        assert!(dict_tracker.written.contains_key(&0));
2852    }
2853
2854    fn write_union_file(options: IpcWriteOptions) {
2855        let schema = Schema::new(vec![Field::new_union(
2856            "union",
2857            vec![0, 1],
2858            vec![
2859                Field::new("a", DataType::Int32, false),
2860                Field::new("c", DataType::Float64, false),
2861            ],
2862            UnionMode::Sparse,
2863        )]);
2864        let mut builder = UnionBuilder::with_capacity_sparse(5);
2865        builder.append::<Int32Type>("a", 1).unwrap();
2866        builder.append_null::<Int32Type>("a").unwrap();
2867        builder.append::<Float64Type>("c", 3.0).unwrap();
2868        builder.append_null::<Float64Type>("c").unwrap();
2869        builder.append::<Int32Type>("a", 4).unwrap();
2870        let union = builder.build().unwrap();
2871
2872        let batch =
2873            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2874                .unwrap();
2875
2876        let mut file = tempfile::tempfile().unwrap();
2877        {
2878            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2879
2880            writer.write(&batch).unwrap();
2881            writer.finish().unwrap();
2882        }
2883        file.rewind().unwrap();
2884
2885        {
2886            let reader = FileReader::try_new(file, None).unwrap();
2887            reader.for_each(|maybe_batch| {
2888                maybe_batch
2889                    .unwrap()
2890                    .columns()
2891                    .iter()
2892                    .zip(batch.columns())
2893                    .for_each(|(a, b)| {
2894                        assert_eq!(a.data_type(), b.data_type());
2895                        assert_eq!(a.len(), b.len());
2896                        assert_eq!(a.null_count(), b.null_count());
2897                    });
2898            });
2899        }
2900    }
2901
2902    #[test]
2903    fn test_write_union_file_v4_v5() {
2904        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2905        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2906    }
2907
2908    #[test]
2909    fn test_write_view_types() {
2910        const LONG_TEST_STRING: &str =
2911            "This is a long string to make sure binary view array handles it";
2912        let schema = Schema::new(vec![
2913            Field::new("field1", DataType::BinaryView, true),
2914            Field::new("field2", DataType::Utf8View, true),
2915        ]);
2916        let values: Vec<Option<&[u8]>> = vec![
2917            Some(b"foo"),
2918            Some(b"bar"),
2919            Some(LONG_TEST_STRING.as_bytes()),
2920        ];
2921        let binary_array = BinaryViewArray::from_iter(values);
2922        let utf8_array =
2923            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2924        let record_batch = RecordBatch::try_new(
2925            Arc::new(schema.clone()),
2926            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2927        )
2928        .unwrap();
2929
2930        let mut file = tempfile::tempfile().unwrap();
2931        {
2932            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2933            writer.write(&record_batch).unwrap();
2934            writer.finish().unwrap();
2935        }
2936        file.rewind().unwrap();
2937        {
2938            let mut reader = FileReader::try_new(&file, None).unwrap();
2939            let read_batch = reader.next().unwrap().unwrap();
2940            read_batch
2941                .columns()
2942                .iter()
2943                .zip(record_batch.columns())
2944                .for_each(|(a, b)| {
2945                    assert_eq!(a, b);
2946                });
2947        }
2948        file.rewind().unwrap();
2949        {
2950            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2951            let read_batch = reader.next().unwrap().unwrap();
2952            assert_eq!(read_batch.num_columns(), 1);
2953            let read_array = read_batch.column(0);
2954            let write_array = record_batch.column(0);
2955            assert_eq!(read_array, write_array);
2956        }
2957    }
2958
2959    #[test]
2960    fn truncate_ipc_record_batch() {
2961        fn create_batch(rows: usize) -> RecordBatch {
2962            let schema = Schema::new(vec![
2963                Field::new("a", DataType::Int32, false),
2964                Field::new("b", DataType::Utf8, false),
2965            ]);
2966
2967            let a = Int32Array::from_iter_values(0..rows as i32);
2968            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2969
2970            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2971        }
2972
2973        let big_record_batch = create_batch(65536);
2974
2975        let length = 5;
2976        let small_record_batch = create_batch(length);
2977
2978        let offset = 2;
2979        let record_batch_slice = big_record_batch.slice(offset, length);
2980        assert!(
2981            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2982        );
2983        assert_eq!(
2984            serialize_stream(&small_record_batch).len(),
2985            serialize_stream(&record_batch_slice).len()
2986        );
2987
2988        assert_eq!(
2989            deserialize_stream(serialize_stream(&record_batch_slice)),
2990            record_batch_slice
2991        );
2992    }
2993
2994    #[test]
2995    fn truncate_ipc_record_batch_with_nulls() {
2996        fn create_batch() -> RecordBatch {
2997            let schema = Schema::new(vec![
2998                Field::new("a", DataType::Int32, true),
2999                Field::new("b", DataType::Utf8, true),
3000            ]);
3001
3002            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
3003            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
3004
3005            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3006        }
3007
3008        let record_batch = create_batch();
3009        let record_batch_slice = record_batch.slice(1, 2);
3010        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3011
3012        assert!(
3013            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3014        );
3015
3016        assert!(deserialized_batch.column(0).is_null(0));
3017        assert!(deserialized_batch.column(0).is_valid(1));
3018        assert!(deserialized_batch.column(1).is_valid(0));
3019        assert!(deserialized_batch.column(1).is_valid(1));
3020
3021        assert_eq!(record_batch_slice, deserialized_batch);
3022    }
3023
3024    #[test]
3025    fn truncate_ipc_dictionary_array() {
3026        fn create_batch() -> RecordBatch {
3027            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
3028                .into_iter()
3029                .collect();
3030            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3031
3032            let array = DictionaryArray::new(keys, Arc::new(values));
3033
3034            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
3035
3036            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
3037        }
3038
3039        let record_batch = create_batch();
3040        let record_batch_slice = record_batch.slice(1, 2);
3041        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3042
3043        assert!(
3044            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3045        );
3046
3047        assert!(deserialized_batch.column(0).is_valid(0));
3048        assert!(deserialized_batch.column(0).is_null(1));
3049
3050        assert_eq!(record_batch_slice, deserialized_batch);
3051    }
3052
3053    #[test]
3054    fn truncate_ipc_struct_array() {
3055        fn create_batch() -> RecordBatch {
3056            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
3057                .into_iter()
3058                .collect();
3059            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3060
3061            let struct_array = StructArray::from(vec![
3062                (
3063                    Arc::new(Field::new("s", DataType::Utf8, true)),
3064                    Arc::new(strings) as ArrayRef,
3065                ),
3066                (
3067                    Arc::new(Field::new("c", DataType::Int32, true)),
3068                    Arc::new(ints) as ArrayRef,
3069                ),
3070            ]);
3071
3072            let schema = Schema::new(vec![Field::new(
3073                "struct_array",
3074                struct_array.data_type().clone(),
3075                true,
3076            )]);
3077
3078            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3079        }
3080
3081        let record_batch = create_batch();
3082        let record_batch_slice = record_batch.slice(1, 2);
3083        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3084
3085        assert!(
3086            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3087        );
3088
3089        let structs = deserialized_batch
3090            .column(0)
3091            .as_any()
3092            .downcast_ref::<StructArray>()
3093            .unwrap();
3094
3095        assert!(structs.column(0).is_null(0));
3096        assert!(structs.column(0).is_valid(1));
3097        assert!(structs.column(1).is_valid(0));
3098        assert!(structs.column(1).is_null(1));
3099        assert_eq!(record_batch_slice, deserialized_batch);
3100    }
3101
3102    #[test]
3103    fn truncate_ipc_string_array_with_all_empty_string() {
3104        fn create_batch() -> RecordBatch {
3105            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3106            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3107            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3108        }
3109
3110        let record_batch = create_batch();
3111        let record_batch_slice = record_batch.slice(0, 1);
3112        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3113
3114        assert!(
3115            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3116        );
3117        assert_eq!(record_batch_slice, deserialized_batch);
3118    }
3119
3120    #[test]
3121    fn test_stream_writer_writes_array_slice() {
3122        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3123        assert_eq!(
3124            vec![Some(1), Some(2), Some(3)],
3125            array.iter().collect::<Vec<_>>()
3126        );
3127
3128        let sliced = array.slice(1, 2);
3129        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3130
3131        let batch = RecordBatch::try_new(
3132            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3133            vec![Arc::new(sliced)],
3134        )
3135        .expect("new batch");
3136
3137        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3138        writer.write(&batch).expect("write");
3139        let outbuf = writer.into_inner().expect("inner");
3140
3141        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3142        let read_batch = reader.next().unwrap().expect("read batch");
3143
3144        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3145        assert_eq!(
3146            vec![Some(2), Some(3)],
3147            read_array.iter().collect::<Vec<_>>()
3148        );
3149    }
3150
3151    #[test]
3152    fn test_large_slice_uint32() {
3153        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3154            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3155        )));
3156    }
3157
3158    #[test]
3159    fn test_large_slice_string() {
3160        let strings: Vec<_> = (0..8000)
3161            .map(|i| {
3162                if i % 2 == 0 {
3163                    Some(format!("value{i}"))
3164                } else {
3165                    None
3166                }
3167            })
3168            .collect();
3169
3170        ensure_roundtrip(Arc::new(StringArray::from(strings)));
3171    }
3172
3173    #[test]
3174    fn test_large_slice_string_list() {
3175        let mut ls = ListBuilder::new(StringBuilder::new());
3176
3177        let mut s = String::new();
3178        for row_number in 0..8000 {
3179            if row_number % 2 == 0 {
3180                for list_element in 0..1000 {
3181                    s.clear();
3182                    use std::fmt::Write;
3183                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3184                    ls.values().append_value(&s);
3185                }
3186                ls.append(true)
3187            } else {
3188                ls.append(false); // null
3189            }
3190        }
3191
3192        ensure_roundtrip(Arc::new(ls.finish()));
3193    }
3194
3195    #[test]
3196    fn test_large_slice_string_list_of_lists() {
3197        // The reason for the special test is to verify reencode_offsets which looks both at
3198        // the starting offset and the data offset.  So need a dataset where the starting_offset
3199        // is zero but the data offset is not.
3200        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3201
3202        for _ in 0..4000 {
3203            ls.values().append(true);
3204            ls.append(true)
3205        }
3206
3207        let mut s = String::new();
3208        for row_number in 0..4000 {
3209            if row_number % 2 == 0 {
3210                for list_element in 0..1000 {
3211                    s.clear();
3212                    use std::fmt::Write;
3213                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3214                    ls.values().values().append_value(&s);
3215                }
3216                ls.values().append(true);
3217                ls.append(true)
3218            } else {
3219                ls.append(false); // null
3220            }
3221        }
3222
3223        ensure_roundtrip(Arc::new(ls.finish()));
3224    }
3225
3226    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
3227    fn ensure_roundtrip(array: ArrayRef) {
3228        let num_rows = array.len();
3229        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3230        // take off the first element
3231        let sliced_batch = orig_batch.slice(1, num_rows - 1);
3232
3233        let schema = orig_batch.schema();
3234        let stream_data = {
3235            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3236            writer.write(&sliced_batch).unwrap();
3237            writer.into_inner().unwrap()
3238        };
3239        let read_batch = {
3240            let projection = None;
3241            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3242            reader
3243                .next()
3244                .expect("expect no errors reading batch")
3245                .expect("expect batch")
3246        };
3247        assert_eq!(sliced_batch, read_batch);
3248
3249        let file_data = {
3250            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3251            writer.write(&sliced_batch).unwrap();
3252            writer.into_inner().unwrap().into_inner().unwrap()
3253        };
3254        let read_batch = {
3255            let projection = None;
3256            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3257            reader
3258                .next()
3259                .expect("expect no errors reading batch")
3260                .expect("expect batch")
3261        };
3262        assert_eq!(sliced_batch, read_batch);
3263
3264        // TODO test file writer/reader
3265    }
3266
3267    #[test]
3268    fn encode_bools_slice() {
3269        // Test case for https://github.com/apache/arrow-rs/issues/3496
3270        assert_bool_roundtrip([true, false], 1, 1);
3271
3272        // slice somewhere in the middle
3273        assert_bool_roundtrip(
3274            [
3275                true, false, true, true, false, false, true, true, true, false, false, false, true,
3276                true, true, true, false, false, false, false, true, true, true, true, true, false,
3277                false, false, false, false,
3278            ],
3279            13,
3280            17,
3281        );
3282
3283        // start at byte boundary, end in the middle
3284        assert_bool_roundtrip(
3285            [
3286                true, false, true, true, false, false, true, true, true, false, false, false,
3287            ],
3288            8,
3289            2,
3290        );
3291
3292        // start and stop and byte boundary
3293        assert_bool_roundtrip(
3294            [
3295                true, false, true, true, false, false, true, true, true, false, false, false, true,
3296                true, true, true, true, false, false, false, false, false,
3297            ],
3298            8,
3299            8,
3300        );
3301    }
3302
3303    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3304        let val_bool_field = Field::new("val", DataType::Boolean, false);
3305
3306        let schema = Arc::new(Schema::new(vec![val_bool_field]));
3307
3308        let bools = BooleanArray::from(bools.to_vec());
3309
3310        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3311        let batch = batch.slice(offset, length);
3312
3313        let data = serialize_stream(&batch);
3314        let batch2 = deserialize_stream(data);
3315        assert_eq!(batch, batch2);
3316    }
3317
3318    #[test]
3319    fn test_run_array_unslice() {
3320        let total_len = 80;
3321        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3322        let repeats: Vec<usize> = vec![3, 4, 1, 2];
3323        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3324        for ix in 0_usize..32 {
3325            let repeat: usize = repeats[ix % repeats.len()];
3326            let val: Option<i32> = vals[ix % vals.len()];
3327            input_array.resize(input_array.len() + repeat, val);
3328        }
3329
3330        // Encode the input_array to run array
3331        let mut builder =
3332            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3333        builder.extend(input_array.iter().copied());
3334        let run_array = builder.finish();
3335
3336        // test for all slice lengths.
3337        for slice_len in 1..=total_len {
3338            // test for offset = 0, slice length = slice_len
3339            let sliced_run_array: RunArray<Int16Type> =
3340                run_array.slice(0, slice_len).into_data().into();
3341
3342            // Create unsliced run array.
3343            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3344            let typed = unsliced_run_array
3345                .downcast::<PrimitiveArray<Int32Type>>()
3346                .unwrap();
3347            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3348            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3349            assert_eq!(expected, actual);
3350
3351            // test for offset = total_len - slice_len, length = slice_len
3352            let sliced_run_array: RunArray<Int16Type> = run_array
3353                .slice(total_len - slice_len, slice_len)
3354                .into_data()
3355                .into();
3356
3357            // Create unsliced run array.
3358            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3359            let typed = unsliced_run_array
3360                .downcast::<PrimitiveArray<Int32Type>>()
3361                .unwrap();
3362            let expected: Vec<Option<i32>> = input_array
3363                .iter()
3364                .skip(total_len - slice_len)
3365                .copied()
3366                .collect();
3367            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3368            assert_eq!(expected, actual);
3369        }
3370    }
3371
3372    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3373        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3374
3375        for i in 0..100_000 {
3376            for value in [i, i, i] {
3377                ls.values().append_value(value);
3378            }
3379            ls.append(true)
3380        }
3381
3382        ls.finish()
3383    }
3384
3385    fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3386        let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3387
3388        for i in 0..100_000 {
3389            for value in [
3390                format!("value{}", i),
3391                format!("value{}", i),
3392                format!("value{}", i),
3393            ] {
3394                ls.values().append_value(&value);
3395            }
3396            ls.append(true)
3397        }
3398
3399        ls.finish()
3400    }
3401
3402    fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3403        let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3404
3405        for i in 0..100_000 {
3406            for value in [
3407                format!("value{}", i),
3408                format!("value{}", i),
3409                format!("value{}", i),
3410            ] {
3411                ls.values().append_value(&value);
3412            }
3413            ls.append(true)
3414        }
3415
3416        ls.finish()
3417    }
3418
3419    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3420        let mut ls =
3421            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3422
3423        for _i in 0..10_000 {
3424            for j in 0..10 {
3425                for value in [j, j, j, j] {
3426                    ls.values().values().append_value(value);
3427                }
3428                ls.values().append(true)
3429            }
3430            ls.append(true);
3431        }
3432
3433        ls.finish()
3434    }
3435
3436    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3437        let mut ls =
3438            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3439
3440        for _i in 0..999 {
3441            ls.values().append(true);
3442            ls.append(true);
3443        }
3444
3445        for j in 0..10 {
3446            for value in [j, j, j, j] {
3447                ls.values().values().append_value(value);
3448            }
3449            ls.values().append(true)
3450        }
3451        ls.append(true);
3452
3453        for i in 0..9_000 {
3454            for j in 0..10 {
3455                for value in [i + j, i + j, i + j, i + j] {
3456                    ls.values().values().append_value(value);
3457                }
3458                ls.values().append(true)
3459            }
3460            ls.append(true);
3461        }
3462
3463        ls.finish()
3464    }
3465
3466    fn generate_map_array_data() -> MapArray {
3467        let keys_builder = UInt32Builder::new();
3468        let values_builder = UInt32Builder::new();
3469
3470        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3471
3472        for i in 0..100_000 {
3473            for _j in 0..3 {
3474                builder.keys().append_value(i);
3475                builder.values().append_value(i * 2);
3476            }
3477            builder.append(true).unwrap();
3478        }
3479
3480        builder.finish()
3481    }
3482
3483    #[test]
3484    fn reencode_offsets_when_first_offset_is_not_zero() {
3485        let original_list = generate_list_data::<i32>();
3486        let original_data = original_list.into_data();
3487        let slice_data = original_data.slice(75, 7);
3488        let (new_offsets, original_start, length) =
3489            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3490        assert_eq!(
3491            vec![0, 3, 6, 9, 12, 15, 18, 21],
3492            new_offsets.typed_data::<i32>()
3493        );
3494        assert_eq!(225, original_start);
3495        assert_eq!(21, length);
3496    }
3497
3498    #[test]
3499    fn reencode_offsets_when_first_offset_is_zero() {
3500        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3501        // ls = [[], [35, 42]
3502        ls.append(true);
3503        ls.values().append_value(35);
3504        ls.values().append_value(42);
3505        ls.append(true);
3506        let original_list = ls.finish();
3507        let original_data = original_list.into_data();
3508
3509        let slice_data = original_data.slice(1, 1);
3510        let (new_offsets, original_start, length) =
3511            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3512        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3513        assert_eq!(0, original_start);
3514        assert_eq!(2, length);
3515    }
3516
3517    /// Ensure when serde full & sliced versions they are equal to original input.
3518    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3519    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3520        // test both full and sliced versions
3521        let in_sliced = in_batch.slice(999, 1);
3522
3523        let bytes_batch = serialize_file(&in_batch);
3524        let bytes_sliced = serialize_file(&in_sliced);
3525
3526        // serializing 1 row should be significantly smaller than serializing 100,000
3527        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3528
3529        // ensure both are still valid and equal to originals
3530        let out_batch = deserialize_file(bytes_batch);
3531        assert_eq!(in_batch, out_batch);
3532
3533        let out_sliced = deserialize_file(bytes_sliced);
3534        assert_eq!(in_sliced, out_sliced);
3535    }
3536
3537    #[test]
3538    fn encode_lists() {
3539        let val_inner = Field::new_list_field(DataType::UInt32, true);
3540        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3541        let schema = Arc::new(Schema::new(vec![val_list_field]));
3542
3543        let values = Arc::new(generate_list_data::<i32>());
3544
3545        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3546        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3547    }
3548
3549    #[test]
3550    fn encode_empty_list() {
3551        let val_inner = Field::new_list_field(DataType::UInt32, true);
3552        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3553        let schema = Arc::new(Schema::new(vec![val_list_field]));
3554
3555        let values = Arc::new(generate_list_data::<i32>());
3556
3557        let in_batch = RecordBatch::try_new(schema, vec![values])
3558            .unwrap()
3559            .slice(999, 0);
3560        let out_batch = deserialize_file(serialize_file(&in_batch));
3561        assert_eq!(in_batch, out_batch);
3562    }
3563
3564    #[test]
3565    fn encode_large_lists() {
3566        let val_inner = Field::new_list_field(DataType::UInt32, true);
3567        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3568        let schema = Arc::new(Schema::new(vec![val_list_field]));
3569
3570        let values = Arc::new(generate_list_data::<i64>());
3571
3572        // ensure when serde full & sliced versions they are equal to original input
3573        // also ensure serialized sliced version is significantly smaller than serialized full
3574        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3575        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3576    }
3577
3578    #[test]
3579    fn encode_large_lists_non_zero_offset() {
3580        let val_inner = Field::new_list_field(DataType::UInt32, true);
3581        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3582        let schema = Arc::new(Schema::new(vec![val_list_field]));
3583
3584        let values = Arc::new(generate_list_data::<i64>());
3585
3586        check_sliced_list_array(schema, values);
3587    }
3588
3589    #[test]
3590    fn encode_large_lists_string_non_zero_offset() {
3591        let val_inner = Field::new_list_field(DataType::Utf8, true);
3592        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3593        let schema = Arc::new(Schema::new(vec![val_list_field]));
3594
3595        let values = Arc::new(generate_string_list_data::<i64>());
3596
3597        check_sliced_list_array(schema, values);
3598    }
3599
3600    #[test]
3601    fn encode_large_list_string_view_non_zero_offset() {
3602        let val_inner = Field::new_list_field(DataType::Utf8View, true);
3603        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3604        let schema = Arc::new(Schema::new(vec![val_list_field]));
3605
3606        let values = Arc::new(generate_utf8view_list_data::<i64>());
3607
3608        check_sliced_list_array(schema, values);
3609    }
3610
3611    fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3612        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3613            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3614                .unwrap()
3615                .slice(offset, len);
3616            let out_batch = deserialize_file(serialize_file(&in_batch));
3617            assert_eq!(in_batch, out_batch);
3618        }
3619    }
3620
3621    #[test]
3622    fn encode_nested_lists() {
3623        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3624        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3625        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3626        let schema = Arc::new(Schema::new(vec![list_field]));
3627
3628        let values = Arc::new(generate_nested_list_data::<i32>());
3629
3630        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3631        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3632    }
3633
3634    #[test]
3635    fn encode_nested_lists_starting_at_zero() {
3636        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3637        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3638        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3639        let schema = Arc::new(Schema::new(vec![list_field]));
3640
3641        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3642
3643        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3644        roundtrip_ensure_sliced_smaller(in_batch, 1);
3645    }
3646
3647    #[test]
3648    fn encode_map_array() {
3649        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3650        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3651        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3652        let schema = Arc::new(Schema::new(vec![map_field]));
3653
3654        let values = Arc::new(generate_map_array_data());
3655
3656        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3657        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3658    }
3659
3660    fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3661        let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3662
3663        for i in 0u32..100_000 {
3664            if i.is_multiple_of(10_000) {
3665                builder.append(false);
3666                continue;
3667            }
3668            for value in [i, i, i] {
3669                builder.values().append_value(value);
3670            }
3671            builder.append(true);
3672        }
3673
3674        builder.finish()
3675    }
3676
3677    #[test]
3678    fn encode_list_view_arrays() {
3679        let val_inner = Field::new_list_field(DataType::UInt32, true);
3680        let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3681        let schema = Arc::new(Schema::new(vec![val_field]));
3682
3683        let values = Arc::new(generate_list_view_data::<i32>());
3684
3685        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3686        let out_batch = deserialize_file(serialize_file(&in_batch));
3687        assert_eq!(in_batch, out_batch);
3688    }
3689
3690    #[test]
3691    fn encode_large_list_view_arrays() {
3692        let val_inner = Field::new_list_field(DataType::UInt32, true);
3693        let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3694        let schema = Arc::new(Schema::new(vec![val_field]));
3695
3696        let values = Arc::new(generate_list_view_data::<i64>());
3697
3698        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3699        let out_batch = deserialize_file(serialize_file(&in_batch));
3700        assert_eq!(in_batch, out_batch);
3701    }
3702
3703    #[test]
3704    fn check_sliced_list_view_array() {
3705        let inner = Field::new_list_field(DataType::UInt32, true);
3706        let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3707        let schema = Arc::new(Schema::new(vec![field]));
3708        let values = Arc::new(generate_list_view_data::<i32>());
3709
3710        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3711            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3712                .unwrap()
3713                .slice(offset, len);
3714            let out_batch = deserialize_file(serialize_file(&in_batch));
3715            assert_eq!(in_batch, out_batch);
3716        }
3717    }
3718
3719    #[test]
3720    fn check_sliced_large_list_view_array() {
3721        let inner = Field::new_list_field(DataType::UInt32, true);
3722        let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3723        let schema = Arc::new(Schema::new(vec![field]));
3724        let values = Arc::new(generate_list_view_data::<i64>());
3725
3726        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3727            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3728                .unwrap()
3729                .slice(offset, len);
3730            let out_batch = deserialize_file(serialize_file(&in_batch));
3731            assert_eq!(in_batch, out_batch);
3732        }
3733    }
3734
3735    fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3736        let inner_builder = UInt32Builder::new();
3737        let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3738        let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3739
3740        for i in 0u32..10_000 {
3741            if i.is_multiple_of(1_000) {
3742                outer_builder.append(false);
3743                continue;
3744            }
3745
3746            for _ in 0..3 {
3747                for value in [i, i + 1, i + 2] {
3748                    outer_builder.values().values().append_value(value);
3749                }
3750                outer_builder.values().append(true);
3751            }
3752            outer_builder.append(true);
3753        }
3754
3755        outer_builder.finish()
3756    }
3757
3758    #[test]
3759    fn encode_nested_list_views() {
3760        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3761        let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3762        let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3763        let schema = Arc::new(Schema::new(vec![list_field]));
3764
3765        let values = Arc::new(generate_nested_list_view_data::<i32>());
3766
3767        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3768        let out_batch = deserialize_file(serialize_file(&in_batch));
3769        assert_eq!(in_batch, out_batch);
3770    }
3771
3772    fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3773        list_data_type: DataType,
3774        offsets: &[U; 5],
3775        sizes: &[U; 4],
3776    ) {
3777        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3778        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3779        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3780        let dict_data = dict_array.to_data();
3781
3782        let value_offsets = Buffer::from_slice_ref(offsets);
3783        let value_sizes = Buffer::from_slice_ref(sizes);
3784
3785        let list_data = ArrayData::builder(list_data_type)
3786            .len(4)
3787            .add_buffer(value_offsets)
3788            .add_buffer(value_sizes)
3789            .add_child_data(dict_data)
3790            .build()
3791            .unwrap();
3792        let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3793
3794        let schema = Arc::new(Schema::new(vec![Field::new(
3795            "f1",
3796            list_view_array.data_type().clone(),
3797            false,
3798        )]));
3799        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3800
3801        let output_batch = deserialize_file(serialize_file(&input_batch));
3802        assert_eq!(input_batch, output_batch);
3803
3804        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3805        assert_eq!(input_batch, output_batch);
3806    }
3807
3808    #[test]
3809    fn test_roundtrip_list_view_of_dict() {
3810        #[allow(deprecated)]
3811        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3812            "item",
3813            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3814            true,
3815            1,
3816            false,
3817        )));
3818        let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3819        let sizes: &[i32; 4] = &[2, 2, 0, 3];
3820        test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3821    }
3822
3823    #[test]
3824    fn test_roundtrip_large_list_view_of_dict() {
3825        #[allow(deprecated)]
3826        let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3827            "item",
3828            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3829            true,
3830            2,
3831            false,
3832        )));
3833        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3834        let sizes: &[i64; 4] = &[2, 2, 0, 3];
3835        test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3836    }
3837
3838    #[test]
3839    fn test_roundtrip_sliced_list_view_of_dict() {
3840        #[allow(deprecated)]
3841        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3842            "item",
3843            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3844            true,
3845            3,
3846            false,
3847        )));
3848
3849        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3850        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3851        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3852        let dict_data = dict_array.to_data();
3853
3854        let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3855        let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3856        let value_offsets = Buffer::from_slice_ref(offsets);
3857        let value_sizes = Buffer::from_slice_ref(sizes);
3858
3859        let list_data = ArrayData::builder(list_data_type)
3860            .len(6)
3861            .add_buffer(value_offsets)
3862            .add_buffer(value_sizes)
3863            .add_child_data(dict_data)
3864            .build()
3865            .unwrap();
3866        let list_view_array = GenericListViewArray::<i32>::from(list_data);
3867
3868        let schema = Arc::new(Schema::new(vec![Field::new(
3869            "f1",
3870            list_view_array.data_type().clone(),
3871            false,
3872        )]));
3873        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3874
3875        let sliced_batch = input_batch.slice(1, 4);
3876
3877        let output_batch = deserialize_file(serialize_file(&sliced_batch));
3878        assert_eq!(sliced_batch, output_batch);
3879
3880        let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3881        assert_eq!(sliced_batch, output_batch);
3882    }
3883
3884    #[test]
3885    fn test_roundtrip_dense_union_of_dict() {
3886        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3887        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3888        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3889
3890        #[allow(deprecated)]
3891        let dict_field = Arc::new(Field::new_dict(
3892            "dict",
3893            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3894            true,
3895            1,
3896            false,
3897        ));
3898        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3899        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3900
3901        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3902        let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3903
3904        let int_array = Int32Array::from(vec![100, 200]);
3905
3906        let union = UnionArray::try_new(
3907            union_fields.clone(),
3908            types,
3909            Some(offsets),
3910            vec![Arc::new(dict_array), Arc::new(int_array)],
3911        )
3912        .unwrap();
3913
3914        let schema = Arc::new(Schema::new(vec![Field::new(
3915            "union",
3916            DataType::Union(union_fields, UnionMode::Dense),
3917            false,
3918        )]));
3919        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3920
3921        let output_batch = deserialize_file(serialize_file(&input_batch));
3922        assert_eq!(input_batch, output_batch);
3923
3924        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3925        assert_eq!(input_batch, output_batch);
3926    }
3927
3928    #[test]
3929    fn test_roundtrip_sparse_union_of_dict() {
3930        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3931        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3932        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3933
3934        #[allow(deprecated)]
3935        let dict_field = Arc::new(Field::new_dict(
3936            "dict",
3937            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3938            true,
3939            2,
3940            false,
3941        ));
3942        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3943        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3944
3945        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3946
3947        let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3948
3949        let union = UnionArray::try_new(
3950            union_fields.clone(),
3951            types,
3952            None,
3953            vec![Arc::new(dict_array), Arc::new(int_array)],
3954        )
3955        .unwrap();
3956
3957        let schema = Arc::new(Schema::new(vec![Field::new(
3958            "union",
3959            DataType::Union(union_fields, UnionMode::Sparse),
3960            false,
3961        )]));
3962        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3963
3964        let output_batch = deserialize_file(serialize_file(&input_batch));
3965        assert_eq!(input_batch, output_batch);
3966
3967        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3968        assert_eq!(input_batch, output_batch);
3969    }
3970
3971    #[test]
3972    fn test_roundtrip_map_with_dict_keys() {
3973        // Building a map array is a bit involved. We first build a struct arary that has a key and
3974        // value field and then use that to build the actual map array.
3975        let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3976        let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3977        let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3978
3979        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3980
3981        #[allow(deprecated)]
3982        let entries_field = Arc::new(Field::new(
3983            "entries",
3984            DataType::Struct(
3985                vec![
3986                    Field::new_dict(
3987                        "key",
3988                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3989                        false,
3990                        1,
3991                        false,
3992                    ),
3993                    Field::new("value", DataType::Int32, true),
3994                ]
3995                .into(),
3996            ),
3997            false,
3998        ));
3999
4000        let entries = StructArray::from(vec![
4001            (
4002                Arc::new(Field::new(
4003                    "key",
4004                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4005                    false,
4006                )),
4007                Arc::new(dict_keys) as ArrayRef,
4008            ),
4009            (
4010                Arc::new(Field::new("value", DataType::Int32, true)),
4011                Arc::new(values) as ArrayRef,
4012            ),
4013        ]);
4014
4015        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4016
4017        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4018            .len(3)
4019            .add_buffer(offsets)
4020            .add_child_data(entries.into_data())
4021            .build()
4022            .unwrap();
4023        let map_array = MapArray::from(map_data);
4024
4025        let schema = Arc::new(Schema::new(vec![Field::new(
4026            "map",
4027            map_array.data_type().clone(),
4028            false,
4029        )]));
4030        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4031
4032        let output_batch = deserialize_file(serialize_file(&input_batch));
4033        assert_eq!(input_batch, output_batch);
4034
4035        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4036        assert_eq!(input_batch, output_batch);
4037    }
4038
4039    #[test]
4040    fn test_roundtrip_map_with_dict_values() {
4041        // Building a map array is a bit involved. We first build a struct arary that has a key and
4042        // value field and then use that to build the actual map array.
4043        let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
4044
4045        let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
4046        let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4047        let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
4048
4049        #[allow(deprecated)]
4050        let entries_field = Arc::new(Field::new(
4051            "entries",
4052            DataType::Struct(
4053                vec![
4054                    Field::new("key", DataType::Utf8, false),
4055                    Field::new_dict(
4056                        "value",
4057                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4058                        true,
4059                        2,
4060                        false,
4061                    ),
4062                ]
4063                .into(),
4064            ),
4065            false,
4066        ));
4067
4068        let entries = StructArray::from(vec![
4069            (
4070                Arc::new(Field::new("key", DataType::Utf8, false)),
4071                Arc::new(keys) as ArrayRef,
4072            ),
4073            (
4074                Arc::new(Field::new(
4075                    "value",
4076                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4077                    true,
4078                )),
4079                Arc::new(dict_values) as ArrayRef,
4080            ),
4081        ]);
4082
4083        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4084
4085        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4086            .len(3)
4087            .add_buffer(offsets)
4088            .add_child_data(entries.into_data())
4089            .build()
4090            .unwrap();
4091        let map_array = MapArray::from(map_data);
4092
4093        let schema = Arc::new(Schema::new(vec![Field::new(
4094            "map",
4095            map_array.data_type().clone(),
4096            false,
4097        )]));
4098        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4099
4100        let output_batch = deserialize_file(serialize_file(&input_batch));
4101        assert_eq!(input_batch, output_batch);
4102
4103        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4104        assert_eq!(input_batch, output_batch);
4105    }
4106
4107    #[test]
4108    fn test_decimal128_alignment16_is_sufficient() {
4109        const IPC_ALIGNMENT: usize = 16;
4110
4111        // Test a bunch of different dimensions to ensure alignment is never an issue.
4112        // For example, if we only test `num_cols = 1` then even with alignment 8 this
4113        // test would _happen_ to pass, even though for different dimensions like
4114        // `num_cols = 2` it would fail.
4115        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4116            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
4117
4118            let mut fields = Vec::new();
4119            let mut arrays = Vec::new();
4120            for i in 0..num_cols {
4121                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4122                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4123                fields.push(field);
4124                arrays.push(Arc::new(array) as Arc<dyn Array>);
4125            }
4126            let schema = Schema::new(fields);
4127            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4128
4129            let mut writer = FileWriter::try_new_with_options(
4130                Vec::new(),
4131                batch.schema_ref(),
4132                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4133            )
4134            .unwrap();
4135            writer.write(&batch).unwrap();
4136            writer.finish().unwrap();
4137
4138            let out: Vec<u8> = writer.into_inner().unwrap();
4139
4140            let buffer = Buffer::from_vec(out);
4141            let trailer_start = buffer.len() - 10;
4142            let footer_len =
4143                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4144            let footer =
4145                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4146
4147            let schema = fb_to_schema(footer.schema().unwrap());
4148
4149            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
4150            // for `read_record_batch` later on to read the data in a zero-copy manner.
4151            let decoder =
4152                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4153
4154            let batches = footer.recordBatches().unwrap();
4155
4156            let block = batches.get(0);
4157            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4158            let data = buffer.slice_with_length(block.offset() as _, block_len);
4159
4160            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4161
4162            assert_eq!(batch, batch2);
4163        }
4164    }
4165
4166    #[test]
4167    fn test_decimal128_alignment8_is_unaligned() {
4168        const IPC_ALIGNMENT: usize = 8;
4169
4170        let num_cols = 2;
4171        let num_rows = 1;
4172
4173        let mut fields = Vec::new();
4174        let mut arrays = Vec::new();
4175        for i in 0..num_cols {
4176            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4177            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4178            fields.push(field);
4179            arrays.push(Arc::new(array) as Arc<dyn Array>);
4180        }
4181        let schema = Schema::new(fields);
4182        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4183
4184        let mut writer = FileWriter::try_new_with_options(
4185            Vec::new(),
4186            batch.schema_ref(),
4187            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4188        )
4189        .unwrap();
4190        writer.write(&batch).unwrap();
4191        writer.finish().unwrap();
4192
4193        let out: Vec<u8> = writer.into_inner().unwrap();
4194
4195        let buffer = Buffer::from_vec(out);
4196        let trailer_start = buffer.len() - 10;
4197        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4198        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4199        let schema = fb_to_schema(footer.schema().unwrap());
4200
4201        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
4202        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
4203        let decoder =
4204            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4205
4206        let batches = footer.recordBatches().unwrap();
4207
4208        let block = batches.get(0);
4209        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4210        let data = buffer.slice_with_length(block.offset() as _, block_len);
4211
4212        let result = decoder.read_record_batch(block, &data);
4213
4214        let error = result.unwrap_err();
4215        assert_eq!(
4216            error.to_string(),
4217            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4218             offset from expected alignment of 16 by 8"
4219        );
4220    }
4221
4222    #[test]
4223    fn test_flush() {
4224        // We write a schema which is small enough to fit into a buffer and not get flushed,
4225        // and then force the write with .flush().
4226        let num_cols = 2;
4227        let mut fields = Vec::new();
4228        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4229        for i in 0..num_cols {
4230            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4231            fields.push(field);
4232        }
4233        let schema = Schema::new(fields);
4234        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4235        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4236        let mut stream_writer =
4237            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4238                .unwrap();
4239        let mut file_writer =
4240            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4241
4242        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4243        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4244        stream_writer.flush().unwrap();
4245        file_writer.flush().unwrap();
4246        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4247        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4248        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4249        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
4250        // and then a length of 0 (4 bytes) for a total of 8 bytes.
4251        // Everything before that should have been flushed in the .flush() call.
4252        let expected_stream_flushed_bytes = stream_out.len() - 8;
4253        // A file write is the same as the stream write except for the leading magic string
4254        // ARROW1 plus padding, which is 8 bytes.
4255        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4256
4257        assert!(
4258            stream_bytes_written_on_new < stream_bytes_written_on_flush,
4259            "this test makes no sense if flush is not actually required"
4260        );
4261        assert!(
4262            file_bytes_written_on_new < file_bytes_written_on_flush,
4263            "this test makes no sense if flush is not actually required"
4264        );
4265        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4266        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4267    }
4268
4269    #[test]
4270    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4271        let l1_type =
4272            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4273        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4274
4275        let l0_builder = Float32Builder::new();
4276        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4277            "item",
4278            DataType::Float32,
4279            false,
4280        )));
4281        let mut l2_builder =
4282            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4283
4284        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4285            l2_builder.values().values().append_value(point[0]);
4286            l2_builder.values().values().append_value(point[1]);
4287            l2_builder.values().values().append_value(point[2]);
4288
4289            l2_builder.values().append(true);
4290        }
4291        l2_builder.append(true);
4292
4293        let point = [10., 11., 12.];
4294        l2_builder.values().values().append_value(point[0]);
4295        l2_builder.values().values().append_value(point[1]);
4296        l2_builder.values().values().append_value(point[2]);
4297
4298        l2_builder.values().append(true);
4299        l2_builder.append(true);
4300
4301        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4302
4303        let schema = Arc::new(Schema::new_with_metadata(
4304            vec![Field::new("points", l2_type, false)],
4305            HashMap::default(),
4306        ));
4307
4308        // Test a variety of combinations that include 0 and non-zero offsets
4309        // and also portions or the rest of the array
4310        test_slices(&array, &schema, 0, 1)?;
4311        test_slices(&array, &schema, 0, 2)?;
4312        test_slices(&array, &schema, 1, 1)?;
4313
4314        Ok(())
4315    }
4316
4317    #[test]
4318    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4319        let l0_builder = Float32Builder::new();
4320        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4321        let mut l2_builder = ListBuilder::new(l1_builder);
4322
4323        for point in [
4324            [Some(1.0), Some(2.0), None],
4325            [Some(4.0), Some(5.0), Some(6.0)],
4326            [None, Some(8.0), Some(9.0)],
4327        ] {
4328            for p in point {
4329                match p {
4330                    Some(p) => l2_builder.values().values().append_value(p),
4331                    None => l2_builder.values().values().append_null(),
4332                }
4333            }
4334
4335            l2_builder.values().append(true);
4336        }
4337        l2_builder.append(true);
4338
4339        let point = [Some(10.), None, None];
4340        for p in point {
4341            match p {
4342                Some(p) => l2_builder.values().values().append_value(p),
4343                None => l2_builder.values().values().append_null(),
4344            }
4345        }
4346
4347        l2_builder.values().append(true);
4348        l2_builder.append(true);
4349
4350        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4351
4352        let schema = Arc::new(Schema::new_with_metadata(
4353            vec![Field::new(
4354                "points",
4355                DataType::List(Arc::new(Field::new(
4356                    "item",
4357                    DataType::FixedSizeList(
4358                        Arc::new(Field::new("item", DataType::Float32, true)),
4359                        3,
4360                    ),
4361                    true,
4362                ))),
4363                true,
4364            )],
4365            HashMap::default(),
4366        ));
4367
4368        // Test a variety of combinations that include 0 and non-zero offsets
4369        // and also portions or the rest of the array
4370        test_slices(&array, &schema, 0, 1)?;
4371        test_slices(&array, &schema, 0, 2)?;
4372        test_slices(&array, &schema, 1, 1)?;
4373
4374        Ok(())
4375    }
4376
4377    fn test_slices(
4378        parent_array: &ArrayRef,
4379        schema: &SchemaRef,
4380        offset: usize,
4381        length: usize,
4382    ) -> Result<(), ArrowError> {
4383        let subarray = parent_array.slice(offset, length);
4384        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4385
4386        let mut bytes = Vec::new();
4387        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4388        writer.write(&original_batch)?;
4389        writer.finish()?;
4390
4391        let mut cursor = std::io::Cursor::new(bytes);
4392        let mut reader = StreamReader::try_new(&mut cursor, None)?;
4393        let returned_batch = reader.next().unwrap()?;
4394
4395        assert_eq!(original_batch, returned_batch);
4396
4397        Ok(())
4398    }
4399
4400    #[test]
4401    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4402        let int_builder = Int64Builder::new();
4403        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4404            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4405
4406        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4407            fixed_list_builder.values().append_value(point[0]);
4408            fixed_list_builder.values().append_value(point[1]);
4409            fixed_list_builder.values().append_value(point[2]);
4410
4411            fixed_list_builder.append(true);
4412        }
4413
4414        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4415
4416        let schema = Arc::new(Schema::new_with_metadata(
4417            vec![Field::new(
4418                "points",
4419                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4420                false,
4421            )],
4422            HashMap::default(),
4423        ));
4424
4425        // Test a variety of combinations that include 0 and non-zero offsets
4426        // and also portions or the rest of the array
4427        test_slices(&array, &schema, 0, 4)?;
4428        test_slices(&array, &schema, 0, 2)?;
4429        test_slices(&array, &schema, 1, 3)?;
4430        test_slices(&array, &schema, 2, 1)?;
4431
4432        Ok(())
4433    }
4434
4435    #[test]
4436    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4437        let int_builder = Int64Builder::new();
4438        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4439
4440        for point in [
4441            [Some(1), Some(2), None],
4442            [Some(4), Some(5), Some(6)],
4443            [None, Some(8), Some(9)],
4444            [Some(10), None, None],
4445        ] {
4446            for p in point {
4447                match p {
4448                    Some(p) => fixed_list_builder.values().append_value(p),
4449                    None => fixed_list_builder.values().append_null(),
4450                }
4451            }
4452
4453            fixed_list_builder.append(true);
4454        }
4455
4456        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4457
4458        let schema = Arc::new(Schema::new_with_metadata(
4459            vec![Field::new(
4460                "points",
4461                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4462                true,
4463            )],
4464            HashMap::default(),
4465        ));
4466
4467        // Test a variety of combinations that include 0 and non-zero offsets
4468        // and also portions or the rest of the array
4469        test_slices(&array, &schema, 0, 4)?;
4470        test_slices(&array, &schema, 0, 2)?;
4471        test_slices(&array, &schema, 1, 3)?;
4472        test_slices(&array, &schema, 2, 1)?;
4473
4474        Ok(())
4475    }
4476
4477    #[test]
4478    fn test_metadata_encoding_ordering() {
4479        fn create_hash() -> u64 {
4480            let metadata: HashMap<String, String> = [
4481                ("a", "1"), //
4482                ("b", "2"), //
4483                ("c", "3"), //
4484                ("d", "4"), //
4485                ("e", "5"), //
4486            ]
4487            .into_iter()
4488            .map(|(k, v)| (k.to_owned(), v.to_owned()))
4489            .collect();
4490
4491            // Set metadata on both the schema and a field within it.
4492            let schema = Arc::new(
4493                Schema::new(vec![
4494                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4495                ])
4496                .with_metadata(metadata)
4497                .clone(),
4498            );
4499            let batch = RecordBatch::new_empty(schema.clone());
4500
4501            let mut bytes = Vec::new();
4502            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4503            w.write(&batch).unwrap();
4504            w.finish().unwrap();
4505
4506            let mut h = std::hash::DefaultHasher::new();
4507            h.write(&bytes);
4508            h.finish()
4509        }
4510
4511        let expected = create_hash();
4512
4513        // Since there is randomness in the HashMap and we cannot specify our
4514        // own Hasher for the implementation used for metadata, run the above
4515        // code 20x and verify it does not change. This is not perfect but it
4516        // should be good enough.
4517        let all_passed = (0..20).all(|_| create_hash() == expected);
4518        assert!(all_passed);
4519    }
4520
4521    #[test]
4522    fn test_dictionary_tracker_reset() {
4523        let data_gen = IpcDataGenerator::default();
4524        let mut dictionary_tracker = DictionaryTracker::new(false);
4525        let writer_options = IpcWriteOptions::default();
4526        let mut compression_ctx = CompressionContext::default();
4527
4528        let schema = Arc::new(Schema::new(vec![Field::new(
4529            "a",
4530            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4531            false,
4532        )]));
4533
4534        let mut write_single_batch_stream =
4535            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4536                let mut buffer = Vec::new();
4537
4538                // create a new IPC stream:
4539                let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4540                    &schema,
4541                    dict_tracker,
4542                    &writer_options,
4543                );
4544                _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4545
4546                let (encoded_dicts, encoded_batch) = data_gen
4547                    .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4548                    .unwrap();
4549                for encoded_dict in encoded_dicts {
4550                    _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4551                }
4552                _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4553
4554                buffer
4555            };
4556
4557        let batch1 = RecordBatch::try_new(
4558            schema.clone(),
4559            vec![Arc::new(DictionaryArray::new(
4560                UInt8Array::from_iter_values([0]),
4561                Arc::new(StringArray::from_iter_values(["a"])),
4562            ))],
4563        )
4564        .unwrap();
4565        let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4566
4567        // ensure we can read the stream back
4568        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4569        let read_batch = reader.next().unwrap().unwrap();
4570        assert_eq!(read_batch, batch1);
4571
4572        // reset the dictionary tracker so it can be used for next stream
4573        dictionary_tracker.clear();
4574
4575        // now write a 2nd stream and ensure we can also read it:
4576        let batch2 = RecordBatch::try_new(
4577            schema.clone(),
4578            vec![Arc::new(DictionaryArray::new(
4579                UInt8Array::from_iter_values([0]),
4580                Arc::new(StringArray::from_iter_values(["a"])),
4581            ))],
4582        )
4583        .unwrap();
4584        let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4585        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4586        let read_batch = reader.next().unwrap().unwrap();
4587        assert_eq!(read_batch, batch2);
4588    }
4589}