Skip to main content

arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46#[expect(deprecated)]
47pub use crate::compression::{CompressionContext, IpcWriteContext};
48use crate::convert::IpcSchemaEncoder;
49
50/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
51#[derive(Debug, Clone)]
52pub struct IpcWriteOptions {
53    /// Write padding after memory buffers to this multiple of bytes.
54    /// Must be 8, 16, 32, or 64 - defaults to 64.
55    alignment: u8,
56    /// The legacy format is for releases before 0.15.0, and uses metadata V4
57    write_legacy_ipc_format: bool,
58    /// The metadata version to write. The Rust IPC writer supports V4+
59    ///
60    /// *Default versions per crate*
61    ///
62    /// When creating the default IpcWriteOptions, the following metadata versions are used:
63    ///
64    /// version 2.0.0: V4, with legacy format enabled
65    /// version 4.0.0: V5
66    metadata_version: crate::MetadataVersion,
67    /// Compression, if desired. Will result in a runtime error
68    /// if the corresponding feature is not enabled
69    batch_compression_type: Option<crate::CompressionType>,
70    // Compression level
71    batch_compression_level: Option<i32>,
72    /// How to handle updating dictionaries in IPC messages
73    dictionary_handling: DictionaryHandling,
74}
75
76/// A single buffer segment ready to be written to the output stream.
77///
78/// For the uncompressed path the original Arc-backed [`Buffer`] is stored
79/// directly (zero copy). For the compressed path the compressed bytes are
80/// owned by a scratch `Vec<u8>`.
81enum EncodedBuffer {
82    /// Uncompressed: Arc-backed reference to the original array buffer.
83    Raw(Buffer),
84    /// Compressed: owned scratch bytes produced by the codec.
85    Compressed(Vec<u8>),
86}
87
88impl EncodedBuffer {
89    fn as_slice(&self) -> &[u8] {
90        match self {
91            EncodedBuffer::Raw(b) => b.as_slice(),
92            EncodedBuffer::Compressed(v) => v.as_slice(),
93        }
94    }
95
96    fn len(&self) -> usize {
97        match self {
98            EncodedBuffer::Raw(b) => b.len(),
99            EncodedBuffer::Compressed(v) => v.len(),
100        }
101    }
102}
103/// Accumulates the IPC metadata produced by [`write_array_data`].
104///
105/// `nodes` and `buffers` are serialised into the flatbuffer `RecordBatch` (or `DictionaryBatch`)
106/// header. The companion [`IpcBodySink`] holds the actual encoded bytes.
107#[derive(Default)]
108struct IpcMetadataBuilder {
109    nodes: Vec<crate::FieldNode>,
110    buffers: Vec<crate::Buffer>,
111}
112
113/// Destination for the raw Arrow data bytes (the IPC message body) produced by [`write_array_data`].
114///
115/// The companion [`IpcMetadataBuilder`] accumulates the flatbuffer metadata
116/// (offset + length of each buffer in the body); together they form a complete IPC message.
117enum IpcBodySink<'a> {
118    /// Serialize buffer bytes (with padding) into a contiguous byte vec.
119    Write(&'a mut Vec<u8>),
120    /// Accumulate pre-encoded buffer segments for deferred zero-copy streaming.
121    Collect(&'a mut Vec<EncodedBuffer>),
122}
123impl<'a> IpcBodySink<'a> {
124    /// Writes the encoded buffer to the sink.
125    pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
126        match self {
127            IpcBodySink::Write(vec) => {
128                vec.extend_from_slice(buffer.as_slice());
129                vec.extend_from_slice(&PADDING[..pad_len]);
130            }
131            IpcBodySink::Collect(vec) => {
132                vec.push(buffer);
133            }
134        }
135    }
136}
137
138/// Per-message sizes produced by [`IpcDataGenerator::write`].
139///
140/// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for
141/// random-access reads.
142struct IpcWriteMetadata {
143    /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written
144    /// before the record batch.
145    dictionary_block_sizes: Vec<(usize, usize)>,
146    /// Flatbuffer header size including continuation prefix and alignment padding.
147    padded_header_len: usize,
148    /// Total length of the record-batch body including trailing alignment padding.
149    body_len: usize,
150}
151
152impl IpcWriteOptions {
153    /// Configures compression when writing IPC files.
154    ///
155    /// Will result in a runtime error if the corresponding feature
156    /// is not enabled
157    pub fn try_with_compression(
158        mut self,
159        batch_compression_type: Option<crate::CompressionType>,
160    ) -> Result<Self, ArrowError> {
161        self.batch_compression_type = batch_compression_type;
162
163        if self.batch_compression_type.is_some()
164            && self.metadata_version < crate::MetadataVersion::V5
165        {
166            return Err(ArrowError::InvalidArgumentError(
167                "Compression only supported in metadata v5 and above".to_string(),
168            ));
169        }
170        Ok(self)
171    }
172
173    /// Configures the compression level used when writing compressed IPC batches.
174    ///
175    /// Compression levels require metadata V5 or newer and are currently only
176    /// supported for ZSTD compression.
177    pub fn try_with_compression_level(
178        mut self,
179        batch_compression_level: Option<i32>,
180    ) -> Result<Self, ArrowError> {
181        self.batch_compression_level = batch_compression_level;
182
183        if self.batch_compression_level.is_some()
184            && self.metadata_version < crate::MetadataVersion::V5
185        {
186            return Err(ArrowError::InvalidArgumentError(
187                "Compression only supported in metadata v5 and above".to_string(),
188            ));
189        }
190
191        match (self.batch_compression_type, self.batch_compression_level) {
192            (Some(crate::CompressionType::ZSTD), Some(level)) => {
193                return self.check_zstd_level(level);
194            }
195            (Some(crate::CompressionType::LZ4_FRAME), Some(_)) => {
196                return Err(ArrowError::InvalidArgumentError(
197                    "LZ4 Frame compression does not support configurable compression levels"
198                        .to_string(),
199                ));
200            }
201            _ => {}
202        }
203
204        Ok(self)
205    }
206
207    #[cfg(not(feature = "zstd"))]
208    fn check_zstd_level(self, _level: i32) -> Result<Self, ArrowError> {
209        Err(ArrowError::InvalidArgumentError(
210            "zstd IPC compression requires the zstd feature".to_string(),
211        ))
212    }
213
214    #[cfg(feature = "zstd")]
215    fn check_zstd_level(self, level: i32) -> Result<Self, ArrowError> {
216        let range = zstd::compression_level_range();
217        if !range.contains(&(level as zstd::zstd_safe::CompressionLevel)) {
218            return Err(ArrowError::InvalidArgumentError(format!(
219                "ZSTD compression level must be between {} and {}, got {}",
220                range.start(),
221                range.end(),
222                level,
223            )));
224        }
225
226        Ok(self)
227    }
228
229    /// Try to create IpcWriteOptions, checking for incompatible settings
230    pub fn try_new(
231        alignment: usize,
232        write_legacy_ipc_format: bool,
233        metadata_version: crate::MetadataVersion,
234    ) -> Result<Self, ArrowError> {
235        let is_alignment_valid =
236            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
237        if !is_alignment_valid {
238            return Err(ArrowError::InvalidArgumentError(
239                "Alignment should be 8, 16, 32, or 64.".to_string(),
240            ));
241        }
242        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
243        match metadata_version {
244            crate::MetadataVersion::V1
245            | crate::MetadataVersion::V2
246            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
247                "Writing IPC metadata version 3 and lower not supported".to_string(),
248            )),
249            #[allow(deprecated)]
250            crate::MetadataVersion::V4 => Ok(Self {
251                alignment,
252                write_legacy_ipc_format,
253                metadata_version,
254                batch_compression_type: None,
255                batch_compression_level: None,
256                dictionary_handling: DictionaryHandling::default(),
257            }),
258            crate::MetadataVersion::V5 => {
259                if write_legacy_ipc_format {
260                    Err(ArrowError::InvalidArgumentError(
261                        "Legacy IPC format only supported on metadata version 4".to_string(),
262                    ))
263                } else {
264                    Ok(Self {
265                        alignment,
266                        write_legacy_ipc_format,
267                        metadata_version,
268                        batch_compression_type: None,
269                        batch_compression_level: None,
270                        dictionary_handling: DictionaryHandling::default(),
271                    })
272                }
273            }
274            z => Err(ArrowError::InvalidArgumentError(format!(
275                "Unsupported crate::MetadataVersion {z:?}"
276            ))),
277        }
278    }
279
280    /// Configure how dictionaries are handled in IPC messages
281    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
282        self.dictionary_handling = dictionary_handling;
283        self
284    }
285}
286
287impl Default for IpcWriteOptions {
288    fn default() -> Self {
289        Self {
290            alignment: 64,
291            write_legacy_ipc_format: false,
292            metadata_version: crate::MetadataVersion::V5,
293            batch_compression_type: None,
294            batch_compression_level: None,
295            dictionary_handling: DictionaryHandling::default(),
296        }
297    }
298}
299
300#[derive(Debug, Default)]
301/// Handles low level details of encoding [`Array`] and [`Schema`] into the
302/// [Arrow IPC Format].
303///
304/// # Example
305/// ```
306/// # fn run() {
307/// # use std::sync::Arc;
308/// # use arrow_array::UInt64Array;
309/// # use arrow_array::RecordBatch;
310/// # use arrow_ipc::writer::{IpcWriteContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
311///
312/// // Create a record batch
313/// let batch = RecordBatch::try_from_iter(vec![
314///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
315/// ]).unwrap();
316///
317/// // Error of dictionary ids are replaced.
318/// let error_on_replacement = true;
319/// let options = IpcWriteOptions::default();
320/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
321///
322/// let mut ipc_write_context = IpcWriteContext::default();
323///
324/// // encode the batch into zero or more encoded dictionaries
325/// // and the data for the actual array.
326/// let data_gen = IpcDataGenerator::default();
327/// let (encoded_dictionaries, encoded_message) = data_gen
328///   .encode(&batch, &mut dictionary_tracker, &options, &mut ipc_write_context)
329///   .unwrap();
330/// # }
331/// ```
332///
333/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
334pub struct IpcDataGenerator {}
335
336impl IpcDataGenerator {
337    /// Converts a schema to an IPC message along with `dictionary_tracker`
338    /// and returns it encoded inside [EncodedData] as a flatbuffer.
339    pub fn schema_to_bytes_with_dictionary_tracker(
340        &self,
341        schema: &Schema,
342        dictionary_tracker: &mut DictionaryTracker,
343        write_options: &IpcWriteOptions,
344    ) -> EncodedData {
345        let mut fbb = FlatBufferBuilder::new();
346        let schema = {
347            let fb = IpcSchemaEncoder::new()
348                .with_dictionary_tracker(dictionary_tracker)
349                .schema_to_fb_offset(&mut fbb, schema);
350            fb.as_union_value()
351        };
352
353        let mut message = crate::MessageBuilder::new(&mut fbb);
354        message.add_version(write_options.metadata_version);
355        message.add_header_type(crate::MessageHeader::Schema);
356        message.add_bodyLength(0);
357        message.add_header(schema);
358        // TODO: custom metadata
359        let data = message.finish();
360        fbb.finish(data, None);
361
362        let data = fbb.finished_data();
363        EncodedData {
364            ipc_message: data.to_vec(),
365            arrow_data: vec![],
366        }
367    }
368
369    fn _encode_dictionaries<I: Iterator<Item = i64>>(
370        &self,
371        column: &ArrayRef,
372        encoded_dictionaries: &mut Vec<EncodedData>,
373        dictionary_tracker: &mut DictionaryTracker,
374        write_options: &IpcWriteOptions,
375        dict_id: &mut I,
376        ipc_write_context: &mut IpcWriteContext,
377    ) -> Result<(), ArrowError> {
378        match column.data_type() {
379            DataType::Struct(fields) => {
380                let s = as_struct_array(column);
381                for (field, column) in fields.iter().zip(s.columns()) {
382                    self.encode_dictionaries(
383                        field,
384                        column,
385                        encoded_dictionaries,
386                        dictionary_tracker,
387                        write_options,
388                        dict_id,
389                        ipc_write_context,
390                    )?;
391                }
392            }
393            DataType::RunEndEncoded(_, values) => {
394                let data = column.to_data();
395                if data.child_data().len() != 2 {
396                    return Err(ArrowError::InvalidArgumentError(format!(
397                        "The run encoded array should have exactly two child arrays. Found {}",
398                        data.child_data().len()
399                    )));
400                }
401                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
402                // only for values array.
403                let values_array = make_array(data.child_data()[1].clone());
404                self.encode_dictionaries(
405                    values,
406                    &values_array,
407                    encoded_dictionaries,
408                    dictionary_tracker,
409                    write_options,
410                    dict_id,
411                    ipc_write_context,
412                )?;
413            }
414            DataType::List(field) => {
415                let list = as_list_array(column);
416                self.encode_dictionaries(
417                    field,
418                    list.values(),
419                    encoded_dictionaries,
420                    dictionary_tracker,
421                    write_options,
422                    dict_id,
423                    ipc_write_context,
424                )?;
425            }
426            DataType::LargeList(field) => {
427                let list = as_large_list_array(column);
428                self.encode_dictionaries(
429                    field,
430                    list.values(),
431                    encoded_dictionaries,
432                    dictionary_tracker,
433                    write_options,
434                    dict_id,
435                    ipc_write_context,
436                )?;
437            }
438            DataType::ListView(field) => {
439                let list = column.as_list_view::<i32>();
440                self.encode_dictionaries(
441                    field,
442                    list.values(),
443                    encoded_dictionaries,
444                    dictionary_tracker,
445                    write_options,
446                    dict_id,
447                    ipc_write_context,
448                )?;
449            }
450            DataType::LargeListView(field) => {
451                let list = column.as_list_view::<i64>();
452                self.encode_dictionaries(
453                    field,
454                    list.values(),
455                    encoded_dictionaries,
456                    dictionary_tracker,
457                    write_options,
458                    dict_id,
459                    ipc_write_context,
460                )?;
461            }
462            DataType::FixedSizeList(field, _) => {
463                let list = column
464                    .as_any()
465                    .downcast_ref::<FixedSizeListArray>()
466                    .expect("Unable to downcast to fixed size list array");
467                self.encode_dictionaries(
468                    field,
469                    list.values(),
470                    encoded_dictionaries,
471                    dictionary_tracker,
472                    write_options,
473                    dict_id,
474                    ipc_write_context,
475                )?;
476            }
477            DataType::Map(field, _) => {
478                let map_array = as_map_array(column);
479
480                let (keys, values) = match field.data_type() {
481                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
482                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
483                };
484
485                // keys
486                self.encode_dictionaries(
487                    keys,
488                    map_array.keys(),
489                    encoded_dictionaries,
490                    dictionary_tracker,
491                    write_options,
492                    dict_id,
493                    ipc_write_context,
494                )?;
495
496                // values
497                self.encode_dictionaries(
498                    values,
499                    map_array.values(),
500                    encoded_dictionaries,
501                    dictionary_tracker,
502                    write_options,
503                    dict_id,
504                    ipc_write_context,
505                )?;
506            }
507            DataType::Union(fields, _) => {
508                let union = as_union_array(column);
509                for (type_id, field) in fields.iter() {
510                    let column = union.child(type_id);
511                    self.encode_dictionaries(
512                        field,
513                        column,
514                        encoded_dictionaries,
515                        dictionary_tracker,
516                        write_options,
517                        dict_id,
518                        ipc_write_context,
519                    )?;
520                }
521            }
522            _ => (),
523        }
524
525        Ok(())
526    }
527
528    #[allow(clippy::too_many_arguments)]
529    fn encode_dictionaries<I: Iterator<Item = i64>>(
530        &self,
531        field: &Field,
532        column: &ArrayRef,
533        encoded_dictionaries: &mut Vec<EncodedData>,
534        dictionary_tracker: &mut DictionaryTracker,
535        write_options: &IpcWriteOptions,
536        dict_id_seq: &mut I,
537        ipc_write_context: &mut IpcWriteContext,
538    ) -> Result<(), ArrowError> {
539        match column.data_type() {
540            DataType::Dictionary(_key_type, _value_type) => {
541                let dict_data = column.to_data();
542                let dict_values = &dict_data.child_data()[0];
543
544                let values = make_array(dict_data.child_data()[0].clone());
545
546                self._encode_dictionaries(
547                    &values,
548                    encoded_dictionaries,
549                    dictionary_tracker,
550                    write_options,
551                    dict_id_seq,
552                    ipc_write_context,
553                )?;
554
555                // It's important to only take the dict_id at this point, because the dict ID
556                // sequence is assigned depth-first, so we need to first encode children and have
557                // them take their assigned dict IDs before we take the dict ID for this field.
558                let dict_id = dict_id_seq.next().ok_or_else(|| {
559                    ArrowError::IpcError(format!(
560                        "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
561                        field.name(),
562                        field.data_type(),
563                        column.data_type()
564                    ))
565                })?;
566
567                match dictionary_tracker.insert_column(
568                    dict_id,
569                    column,
570                    write_options.dictionary_handling,
571                )? {
572                    DictionaryUpdate::None => {}
573                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
574                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
575                            dict_id,
576                            dict_values,
577                            write_options,
578                            false,
579                            ipc_write_context,
580                        )?);
581                    }
582                    DictionaryUpdate::Delta(data) => {
583                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
584                            dict_id,
585                            &data,
586                            write_options,
587                            true,
588                            ipc_write_context,
589                        )?);
590                    }
591                }
592            }
593            _ => self._encode_dictionaries(
594                column,
595                encoded_dictionaries,
596                dictionary_tracker,
597                write_options,
598                dict_id_seq,
599                ipc_write_context,
600            )?,
601        }
602
603        Ok(())
604    }
605
606    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
607    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
608    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
609    pub fn encode(
610        &self,
611        batch: &RecordBatch,
612        dictionary_tracker: &mut DictionaryTracker,
613        write_options: &IpcWriteOptions,
614        ipc_write_context: &mut IpcWriteContext,
615    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
616        let encoded_dictionaries =
617            self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
618        let mut arrow_data = Vec::new();
619        let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
620            batch,
621            write_options,
622            ipc_write_context,
623            &mut IpcBodySink::Write(&mut arrow_data),
624        )?;
625        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
626        Ok((
627            encoded_dictionaries,
628            EncodedData {
629                ipc_message,
630                arrow_data,
631            },
632        ))
633    }
634
635    /// Encode dictionary batches for all columns in `batch`.
636    fn encode_all_dicts(
637        &self,
638        batch: &RecordBatch,
639        dictionary_tracker: &mut DictionaryTracker,
640        write_options: &IpcWriteOptions,
641        ipc_write_context: &mut IpcWriteContext,
642    ) -> Result<Vec<EncodedData>, ArrowError> {
643        let schema = batch.schema();
644        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
645        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
646        for (i, field) in schema.fields().iter().enumerate() {
647            self.encode_dictionaries(
648                field,
649                batch.column(i),
650                &mut encoded_dictionaries,
651                dictionary_tracker,
652                write_options,
653                &mut dict_id,
654                ipc_write_context,
655            )?;
656        }
657        Ok(encoded_dictionaries)
658    }
659
660    /// Write dictionary batches and the record batch directly to `writer`, skipping the
661    /// intermediate body `Vec<u8>` allocations
662    /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer blocks.
663    fn write<W: Write>(
664        &self,
665        batch: &RecordBatch,
666        dictionary_tracker: &mut DictionaryTracker,
667        write_options: &IpcWriteOptions,
668        ipc_write_context: &mut IpcWriteContext,
669        writer: &mut W,
670    ) -> Result<IpcWriteMetadata, ArrowError> {
671        let encoded_dictionaries =
672            self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
673
674        let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
675        for dict in encoded_dictionaries {
676            dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
677        }
678
679        let capacity = batch
680            .columns()
681            .iter()
682            .map(|a| estimate_encoded_buffer_count(a.data_type()))
683            .sum();
684        let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
685        let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
686            batch,
687            write_options,
688            ipc_write_context,
689            &mut IpcBodySink::Collect(&mut encoded_buffers),
690        )?;
691
692        let alignment = write_options.alignment;
693        let a = usize::from(alignment - 1);
694        let prefix_size = if write_options.write_legacy_ipc_format {
695            4
696        } else {
697            8
698        };
699        let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
700        write_continuation(
701            &mut *writer,
702            write_options,
703            (aligned_size - prefix_size) as i32,
704        )?;
705        writer.write_all(&ipc_message)?;
706        writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
707        for enc in &encoded_buffers {
708            writer.write_all(enc.as_slice())?;
709            writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
710        }
711        writer.write_all(&PADDING[..tail_pad])?;
712
713        Ok(IpcWriteMetadata {
714            dictionary_block_sizes,
715            padded_header_len: aligned_size,
716            body_len,
717        })
718    }
719
720    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
721    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
722    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
723    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
724    pub fn encoded_batch(
725        &self,
726        batch: &RecordBatch,
727        dictionary_tracker: &mut DictionaryTracker,
728        write_options: &IpcWriteOptions,
729    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
730        self.encode(
731            batch,
732            dictionary_tracker,
733            write_options,
734            &mut Default::default(),
735        )
736    }
737
738    /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the
739    /// serialised buffer data.
740    ///
741    /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header bytes, the
742    /// total body length including trailing padding, and the trailing alignment padding byte count.
743    fn record_batch_to_bytes(
744        &self,
745        batch: &RecordBatch,
746        write_options: &IpcWriteOptions,
747        ipc_write_context: &mut IpcWriteContext,
748        sink: &mut IpcBodySink<'_>,
749    ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
750        let batch_compression_type = write_options.batch_compression_type;
751
752        let compression = batch_compression_type.map(|batch_compression_type| {
753            let fbb = ipc_write_context.mut_fbb();
754            let mut c = crate::BodyCompressionBuilder::new(fbb);
755            c.add_method(crate::BodyCompressionMethod::BUFFER);
756            c.add_codec(batch_compression_type);
757            c.finish()
758        });
759
760        let batch_compression_level = write_options.batch_compression_level;
761        let compression_codec: Option<CompressionCodec> = batch_compression_type
762            .map(|compression_type| match batch_compression_level {
763                Some(level) => {
764                    CompressionCodec::try_new_with_compression_level(compression_type, level)
765                }
766                None => compression_type.try_into(),
767            })
768            .transpose()?;
769
770        let alignment = write_options.alignment;
771        let mut variadic_buffer_counts = vec![];
772        let mut meta = IpcMetadataBuilder::default();
773        let mut offset = 0i64;
774
775        for array in batch.columns() {
776            let array_data = array.to_data();
777            offset = write_array_data(
778                &array_data,
779                &mut meta,
780                sink,
781                offset,
782                compression_codec,
783                ipc_write_context,
784                write_options,
785            )?;
786            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
787        }
788
789        let tail_pad = pad_to_alignment(alignment, offset as usize);
790        let body_len = offset as usize + tail_pad;
791
792        let fbb = ipc_write_context.mut_fbb();
793        let buffers = fbb.create_vector(&meta.buffers);
794        let nodes = fbb.create_vector(&meta.nodes);
795        let variadic_buffer = if variadic_buffer_counts.is_empty() {
796            None
797        } else {
798            Some(fbb.create_vector(&variadic_buffer_counts))
799        };
800
801        let root = {
802            let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
803            batch_builder.add_length(batch.num_rows() as i64);
804            batch_builder.add_nodes(nodes);
805            batch_builder.add_buffers(buffers);
806            if let Some(c) = compression {
807                batch_builder.add_compression(c);
808            }
809            if let Some(v) = variadic_buffer {
810                batch_builder.add_variadicBufferCounts(v);
811            }
812            batch_builder.finish().as_union_value()
813        };
814        let mut message = crate::MessageBuilder::new(fbb);
815        message.add_version(write_options.metadata_version);
816        message.add_header_type(crate::MessageHeader::RecordBatch);
817        message.add_bodyLength(body_len as i64);
818        message.add_header(root);
819        let root = message.finish();
820        fbb.finish(root, None);
821
822        let ipc_message = fbb.finished_data().to_vec();
823        fbb.reset();
824        Ok((ipc_message, body_len, tail_pad))
825    }
826
827    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
828    /// other for the data
829    fn dictionary_batch_to_bytes(
830        &self,
831        dict_id: i64,
832        array_data: &ArrayData,
833        write_options: &IpcWriteOptions,
834        is_delta: bool,
835        ipc_write_context: &mut IpcWriteContext,
836    ) -> Result<EncodedData, ArrowError> {
837        let mut arrow_data: Vec<u8> = vec![];
838
839        // get the type of compression
840        let batch_compression_type = write_options.batch_compression_type;
841
842        let compression = batch_compression_type.map(|batch_compression_type| {
843            let fbb = ipc_write_context.mut_fbb();
844            let mut c = crate::BodyCompressionBuilder::new(fbb);
845            c.add_method(crate::BodyCompressionMethod::BUFFER);
846            c.add_codec(batch_compression_type);
847            c.finish()
848        });
849
850        let batch_compression_level = write_options.batch_compression_level;
851        let compression_codec: Option<CompressionCodec> = batch_compression_type
852            .map(|batch_compression_type| match batch_compression_level {
853                Some(level) => {
854                    CompressionCodec::try_new_with_compression_level(batch_compression_type, level)
855                }
856                None => batch_compression_type.try_into(),
857            })
858            .transpose()?;
859
860        let alignment = write_options.alignment;
861        let mut meta = IpcMetadataBuilder::default();
862        let mut sink = IpcBodySink::Write(&mut arrow_data);
863        let offset = write_array_data(
864            array_data,
865            &mut meta,
866            &mut sink,
867            0,
868            compression_codec,
869            ipc_write_context,
870            write_options,
871        )?;
872
873        let mut variadic_buffer_counts = vec![];
874        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
875
876        // pad the tail of body data
877        let tail_pad = pad_to_alignment(alignment, offset as usize);
878        let body_len = offset as usize + tail_pad;
879        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
880
881        let fbb = ipc_write_context.mut_fbb();
882        let buffers = fbb.create_vector(&meta.buffers);
883        let nodes = fbb.create_vector(&meta.nodes);
884        let variadic_buffer = if variadic_buffer_counts.is_empty() {
885            None
886        } else {
887            Some(fbb.create_vector(&variadic_buffer_counts))
888        };
889
890        let root = {
891            let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
892            batch_builder.add_length(array_data.len() as i64);
893            batch_builder.add_nodes(nodes);
894            batch_builder.add_buffers(buffers);
895            if let Some(c) = compression {
896                batch_builder.add_compression(c);
897            }
898            if let Some(v) = variadic_buffer {
899                batch_builder.add_variadicBufferCounts(v);
900            }
901            batch_builder.finish()
902        };
903
904        let root = {
905            let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb);
906            batch_builder.add_id(dict_id);
907            batch_builder.add_data(root);
908            batch_builder.add_isDelta(is_delta);
909            batch_builder.finish().as_union_value()
910        };
911
912        let root = {
913            let mut message_builder = crate::MessageBuilder::new(fbb);
914            message_builder.add_version(write_options.metadata_version);
915            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
916            message_builder.add_bodyLength(body_len as i64);
917            message_builder.add_header(root);
918            message_builder.finish()
919        };
920
921        fbb.finish(root, None);
922        let ipc_message = fbb.finished_data().to_vec();
923        fbb.reset();
924
925        Ok(EncodedData {
926            ipc_message,
927            arrow_data,
928        })
929    }
930}
931
932fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
933    match array.data_type() {
934        DataType::BinaryView | DataType::Utf8View => {
935            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
936            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
937            counts.push(array.buffers().len() as i64 - 1);
938        }
939        DataType::Dictionary(_, _) => {
940            // Do nothing
941            // Dictionary types are handled in `encode_dictionaries`.
942        }
943        _ => {
944            for child in array.child_data() {
945                append_variadic_buffer_counts(counts, child)
946            }
947        }
948    }
949}
950
951pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
952    match arr.data_type() {
953        DataType::RunEndEncoded(k, _) => match k.data_type() {
954            DataType::Int16 => {
955                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
956            }
957            DataType::Int32 => {
958                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
959            }
960            DataType::Int64 => {
961                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
962            }
963            d => unreachable!("Unexpected data type {d}"),
964        },
965        d => Err(ArrowError::InvalidArgumentError(format!(
966            "The given array is not a run array. Data type of given array: {d}"
967        ))),
968    }
969}
970
971// Returns a `RunArray` with zero offset and length matching the last value
972// in run_ends array.
973fn into_zero_offset_run_array<R: RunEndIndexType>(
974    run_array: RunArray<R>,
975) -> Result<RunArray<R>, ArrowError> {
976    let run_ends = run_array.run_ends();
977    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
978        return Ok(run_array);
979    }
980
981    // The physical index of original run_ends array from which the `ArrayData`is sliced.
982    let start_physical_index = run_ends.get_start_physical_index();
983
984    // The physical index of original run_ends array until which the `ArrayData`is sliced.
985    let end_physical_index = run_ends.get_end_physical_index();
986
987    let physical_length = end_physical_index - start_physical_index + 1;
988
989    // build new run_ends array by subtracting offset from run ends.
990    let offset = R::Native::usize_as(run_ends.offset());
991    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
992    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
993        builder.append(run_end_value.sub_wrapping(offset));
994    }
995    builder.append(R::Native::from_usize(run_array.len()).unwrap());
996    let new_run_ends = unsafe {
997        // Safety:
998        // The function builds a valid run_ends array and hence need not be validated.
999        ArrayDataBuilder::new(R::DATA_TYPE)
1000            .len(physical_length)
1001            .add_buffer(builder.finish())
1002            .build_unchecked()
1003    };
1004
1005    // build new values by slicing physical indices.
1006    let new_values = run_array
1007        .values()
1008        .slice(start_physical_index, physical_length)
1009        .into_data();
1010
1011    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
1012        .len(run_array.len())
1013        .add_child_data(new_run_ends)
1014        .add_child_data(new_values);
1015    let array_data = unsafe {
1016        // Safety:
1017        //  This function builds a valid run array and hence can skip validation.
1018        builder.build_unchecked()
1019    };
1020    Ok(array_data.into())
1021}
1022
1023/// Controls how dictionaries are handled in Arrow IPC messages
1024#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1025pub enum DictionaryHandling {
1026    /// Send the entire dictionary every time it is encountered (default)
1027    #[default]
1028    Resend,
1029    /// Send only new dictionary values since the last batch (delta encoding)
1030    ///
1031    /// When a dictionary is first encountered, the entire dictionary is sent.
1032    /// For subsequent batches, only values that are new (not previously sent)
1033    /// are transmitted with the `isDelta` flag set to true.
1034    Delta,
1035}
1036
1037/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
1038#[derive(Debug, Clone)]
1039pub enum DictionaryUpdate {
1040    /// No dictionary was written, the dictionary was identical to what was already
1041    /// in the tracker.
1042    None,
1043    /// No dictionary was present in the tracker
1044    New,
1045    /// Dictionary was replaced with the new data
1046    Replaced,
1047    /// Dictionary was updated, ArrayData is the delta between old and new
1048    Delta(ArrayData),
1049}
1050
1051/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
1052/// multiple times.
1053///
1054/// Can optionally error if an update to an existing dictionary is attempted, which
1055/// isn't allowed in the `FileWriter`.
1056#[derive(Debug)]
1057pub struct DictionaryTracker {
1058    // NOTE: When adding fields, update the clear() method accordingly.
1059    written: HashMap<i64, ArrayData>,
1060    dict_ids: Vec<i64>,
1061    error_on_replacement: bool,
1062}
1063
1064impl DictionaryTracker {
1065    /// Create a new [`DictionaryTracker`].
1066    ///
1067    /// If `error_on_replacement`
1068    /// is true, an error will be generated if an update to an
1069    /// existing dictionary is attempted.
1070    pub fn new(error_on_replacement: bool) -> Self {
1071        #[allow(deprecated)]
1072        Self {
1073            written: HashMap::new(),
1074            dict_ids: Vec::new(),
1075            error_on_replacement,
1076        }
1077    }
1078
1079    /// Record and return the next dictionary ID.
1080    pub fn next_dict_id(&mut self) -> i64 {
1081        let next = self
1082            .dict_ids
1083            .last()
1084            .copied()
1085            .map(|i| i + 1)
1086            .unwrap_or_default();
1087
1088        self.dict_ids.push(next);
1089        next
1090    }
1091
1092    /// Return the sequence of dictionary IDs in the order they should be observed while
1093    /// traversing the schema
1094    pub fn dict_id(&mut self) -> &[i64] {
1095        &self.dict_ids
1096    }
1097
1098    /// Keep track of the dictionary with the given ID and values. Behavior:
1099    ///
1100    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
1101    ///   that the dictionary was not actually inserted (because it's already been seen).
1102    /// * If this ID has been written already but with different data, and this tracker is
1103    ///   configured to return an error, return an error.
1104    /// * If the tracker has not been configured to error on replacement or this dictionary
1105    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
1106    ///   inserted.
1107    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1108    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1109        let dict_data = column.to_data();
1110        let dict_values = &dict_data.child_data()[0];
1111
1112        // If a dictionary with this id was already emitted, check if it was the same.
1113        if let Some(last) = self.written.get(&dict_id) {
1114            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1115                // Same dictionary values => no need to emit it again
1116                return Ok(false);
1117            }
1118            if self.error_on_replacement {
1119                // If error on replacement perform a logical comparison
1120                if last.child_data()[0] == *dict_values {
1121                    // Same dictionary values => no need to emit it again
1122                    return Ok(false);
1123                }
1124                return Err(ArrowError::InvalidArgumentError(
1125                    "Dictionary replacement detected when writing IPC file format. \
1126                     Arrow IPC files only support a single dictionary for a given field \
1127                     across all batches."
1128                        .to_string(),
1129                ));
1130            }
1131        }
1132
1133        self.written.insert(dict_id, dict_data);
1134        Ok(true)
1135    }
1136
1137    /// Keep track of the dictionary with the given ID and values. The return
1138    /// value indicates what, if any, update to the internal map took place
1139    /// and how it should be interpreted based on the `dict_handling` parameter.
1140    ///
1141    /// # Returns
1142    ///
1143    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
1144    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
1145    ///   with completely different data, or if the data is a delta of the existing,
1146    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
1147    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
1148    ///   the new data is a delta of the old and the `dict_handling` is set to
1149    ///   `DictionaryHandling::Delta`
1150    /// * `Err(e)` - If the dictionary was previously written with different data,
1151    ///   and `error_on_replacement` is set to `true`.
1152    pub fn insert_column(
1153        &mut self,
1154        dict_id: i64,
1155        column: &ArrayRef,
1156        dict_handling: DictionaryHandling,
1157    ) -> Result<DictionaryUpdate, ArrowError> {
1158        let new_data = column.to_data();
1159        let new_values = &new_data.child_data()[0];
1160
1161        // If there is no existing dictionary with this ID, we always insert
1162        let Some(old) = self.written.get(&dict_id) else {
1163            self.written.insert(dict_id, new_data);
1164            return Ok(DictionaryUpdate::New);
1165        };
1166
1167        // Fast path - If the array data points to the same buffer as the
1168        // existing then they're the same.
1169        let old_values = &old.child_data()[0];
1170        if ArrayData::ptr_eq(old_values, new_values) {
1171            return Ok(DictionaryUpdate::None);
1172        }
1173
1174        // Slow path - Compare the dictionaries value by value
1175        let comparison = compare_dictionaries(old_values, new_values);
1176        if matches!(comparison, DictionaryComparison::Equal) {
1177            return Ok(DictionaryUpdate::None);
1178        }
1179
1180        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1181                 Arrow IPC files only support a single dictionary for a given field \
1182                 across all batches.";
1183
1184        match comparison {
1185            DictionaryComparison::NotEqual => {
1186                if self.error_on_replacement {
1187                    return Err(ArrowError::InvalidArgumentError(
1188                        REPLACEMENT_ERROR.to_string(),
1189                    ));
1190                }
1191
1192                self.written.insert(dict_id, new_data);
1193                Ok(DictionaryUpdate::Replaced)
1194            }
1195            DictionaryComparison::Delta => match dict_handling {
1196                DictionaryHandling::Resend => {
1197                    if self.error_on_replacement {
1198                        return Err(ArrowError::InvalidArgumentError(
1199                            REPLACEMENT_ERROR.to_string(),
1200                        ));
1201                    }
1202
1203                    self.written.insert(dict_id, new_data);
1204                    Ok(DictionaryUpdate::Replaced)
1205                }
1206                DictionaryHandling::Delta => {
1207                    let delta =
1208                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
1209                    self.written.insert(dict_id, new_data);
1210                    Ok(DictionaryUpdate::Delta(delta))
1211                }
1212            },
1213            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1214        }
1215    }
1216
1217    /// Clears the state of the dictionary tracker.
1218    ///
1219    /// This allows the dictionary tracker to be reused for a new IPC stream while avoiding the
1220    /// allocation cost of creating a new instance. This method should not be called if
1221    /// the dictionary tracker will be used to continue writing to an existing IPC stream.
1222    pub fn clear(&mut self) {
1223        self.dict_ids.clear();
1224        self.written.clear();
1225    }
1226}
1227
1228/// Describes how two dictionary arrays compare to each other.
1229#[derive(Debug, Clone)]
1230enum DictionaryComparison {
1231    /// Neither a delta, nor an exact match
1232    NotEqual,
1233    /// Exact element-wise match
1234    Equal,
1235    /// The two arrays are dictionary deltas of each other, meaning the first
1236    /// is a prefix of the second.
1237    Delta,
1238}
1239
1240// Compares two dictionaries and returns a [`DictionaryComparison`].
1241fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1242    // Check for exact match
1243    let existing_len = old.len();
1244    let new_len = new.len();
1245    if existing_len == new_len {
1246        if *old == *new {
1247            return DictionaryComparison::Equal;
1248        } else {
1249            return DictionaryComparison::NotEqual;
1250        }
1251    }
1252
1253    // Can't be a delta if the new is shorter than the existing
1254    if new_len < existing_len {
1255        return DictionaryComparison::NotEqual;
1256    }
1257
1258    // Check for delta
1259    if new.slice(0, existing_len) == *old {
1260        return DictionaryComparison::Delta;
1261    }
1262
1263    DictionaryComparison::NotEqual
1264}
1265
1266/// Arrow File Writer
1267///
1268/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1269///
1270/// # See Also
1271///
1272/// * [`StreamWriter`] for writing IPC Streams
1273///
1274/// # Example
1275/// ```
1276/// # use arrow_array::record_batch;
1277/// # use arrow_ipc::writer::FileWriter;
1278/// # let mut file = vec![]; // mimic a file for the example
1279/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1280/// // create a new writer, the schema must be known in advance
1281/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1282/// // write each batch to the underlying writer
1283/// writer.write(&batch).unwrap();
1284/// // When all batches are written, call finish to flush all buffers
1285/// writer.finish().unwrap();
1286/// ```
1287/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1288pub struct FileWriter<W> {
1289    /// The object to write to
1290    writer: W,
1291    /// IPC write options
1292    write_options: IpcWriteOptions,
1293    /// A reference to the schema, used in validating record batches
1294    schema: SchemaRef,
1295    /// The number of bytes between each block of bytes, as an offset for random access
1296    block_offsets: usize,
1297    /// Dictionary blocks that will be written as part of the IPC footer
1298    dictionary_blocks: Vec<crate::Block>,
1299    /// Record blocks that will be written as part of the IPC footer
1300    record_blocks: Vec<crate::Block>,
1301    /// Whether the writer footer has been written, and the writer is finished
1302    finished: bool,
1303    /// Keeps track of dictionaries that have been written
1304    dictionary_tracker: DictionaryTracker,
1305    /// User level customized metadata
1306    custom_metadata: HashMap<String, String>,
1307
1308    data_gen: IpcDataGenerator,
1309
1310    ipc_write_context: IpcWriteContext,
1311}
1312
1313impl<W: Write> FileWriter<BufWriter<W>> {
1314    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1315    ///
1316    /// See [`FileWriter::try_new`] for an unbuffered version.
1317    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1318        Self::try_new(BufWriter::new(writer), schema)
1319    }
1320}
1321
1322impl<W: Write> FileWriter<W> {
1323    /// Try to create a new writer, with the schema written as part of the header
1324    ///
1325    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1326    ///
1327    /// # Errors
1328    ///
1329    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1330    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1331        let write_options = IpcWriteOptions::default();
1332        Self::try_new_with_options(writer, schema, write_options)
1333    }
1334
1335    /// Try to create a new writer with IpcWriteOptions
1336    ///
1337    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1338    ///
1339    /// # Errors
1340    ///
1341    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1342    pub fn try_new_with_options(
1343        mut writer: W,
1344        schema: &Schema,
1345        write_options: IpcWriteOptions,
1346    ) -> Result<Self, ArrowError> {
1347        let data_gen = IpcDataGenerator::default();
1348        // write magic to header aligned on alignment boundary
1349        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1350        let header_size = super::ARROW_MAGIC.len() + pad_len;
1351        writer.write_all(&super::ARROW_MAGIC)?;
1352        writer.write_all(&PADDING[..pad_len])?;
1353        // write the schema, set the written bytes to the schema + header
1354        let mut dictionary_tracker = DictionaryTracker::new(true);
1355        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1356            schema,
1357            &mut dictionary_tracker,
1358            &write_options,
1359        );
1360        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1361        Ok(Self {
1362            writer,
1363            write_options,
1364            schema: Arc::new(schema.clone()),
1365            block_offsets: meta + data + header_size,
1366            dictionary_blocks: vec![],
1367            record_blocks: vec![],
1368            finished: false,
1369            dictionary_tracker,
1370            custom_metadata: HashMap::new(),
1371            data_gen,
1372            ipc_write_context: IpcWriteContext::default(),
1373        })
1374    }
1375
1376    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1377    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1378        self.custom_metadata.insert(key.into(), value.into());
1379    }
1380
1381    /// Write a record batch to the file
1382    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1383        if self.finished {
1384            return Err(ArrowError::IpcError(
1385                "Cannot write record batch to file writer as it is closed".to_string(),
1386            ));
1387        }
1388
1389        let meta = self.data_gen.write(
1390            batch,
1391            &mut self.dictionary_tracker,
1392            &self.write_options,
1393            &mut self.ipc_write_context,
1394            &mut self.writer,
1395        )?;
1396
1397        for (header_len, body_len) in meta.dictionary_block_sizes {
1398            let block = crate::Block::new(
1399                self.block_offsets as i64,
1400                header_len as i32,
1401                body_len as i64,
1402            );
1403            self.dictionary_blocks.push(block);
1404            self.block_offsets += header_len + body_len;
1405        }
1406
1407        // add a record block for the footer
1408        let block = crate::Block::new(
1409            self.block_offsets as i64,
1410            meta.padded_header_len as i32,
1411            meta.body_len as i64,
1412        );
1413        self.record_blocks.push(block);
1414        self.block_offsets += meta.padded_header_len + meta.body_len;
1415        Ok(())
1416    }
1417
1418    /// Write footer and closing tag, then mark the writer as done
1419    pub fn finish(&mut self) -> Result<(), ArrowError> {
1420        if self.finished {
1421            return Err(ArrowError::IpcError(
1422                "Cannot write footer to file writer as it is closed".to_string(),
1423            ));
1424        }
1425
1426        // write EOS
1427        write_continuation(&mut self.writer, &self.write_options, 0)?;
1428
1429        let mut fbb = FlatBufferBuilder::new();
1430        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1431        let record_batches = fbb.create_vector(&self.record_blocks);
1432
1433        // dictionaries are already written, so we can reset dictionary tracker to reuse for schema
1434        self.dictionary_tracker.clear();
1435        let schema = IpcSchemaEncoder::new()
1436            .with_dictionary_tracker(&mut self.dictionary_tracker)
1437            .schema_to_fb_offset(&mut fbb, &self.schema);
1438        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1439            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1440
1441        let root = {
1442            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1443            footer_builder.add_version(self.write_options.metadata_version);
1444            footer_builder.add_schema(schema);
1445            footer_builder.add_dictionaries(dictionaries);
1446            footer_builder.add_recordBatches(record_batches);
1447            if let Some(fb_custom_metadata) = fb_custom_metadata {
1448                footer_builder.add_custom_metadata(fb_custom_metadata);
1449            }
1450            footer_builder.finish()
1451        };
1452        fbb.finish(root, None);
1453        let footer_data = fbb.finished_data();
1454        self.writer.write_all(footer_data)?;
1455        self.writer
1456            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1457        self.writer.write_all(&super::ARROW_MAGIC)?;
1458        self.writer.flush()?;
1459        self.finished = true;
1460
1461        Ok(())
1462    }
1463
1464    /// Returns the arrow [`SchemaRef`] for this arrow file.
1465    pub fn schema(&self) -> &SchemaRef {
1466        &self.schema
1467    }
1468
1469    /// Gets a reference to the underlying writer.
1470    pub fn get_ref(&self) -> &W {
1471        &self.writer
1472    }
1473
1474    /// Gets a mutable reference to the underlying writer.
1475    ///
1476    /// It is inadvisable to directly write to the underlying writer.
1477    pub fn get_mut(&mut self) -> &mut W {
1478        &mut self.writer
1479    }
1480
1481    /// Flush the underlying writer.
1482    ///
1483    /// Both the BufWriter and the underlying writer are flushed.
1484    pub fn flush(&mut self) -> Result<(), ArrowError> {
1485        self.writer.flush()?;
1486        Ok(())
1487    }
1488
1489    /// Unwraps the underlying writer.
1490    ///
1491    /// The writer is flushed and the FileWriter is finished before returning.
1492    ///
1493    /// # Errors
1494    ///
1495    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1496    /// or while flushing the writer.
1497    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1498        if !self.finished {
1499            // `finish` flushes the writer.
1500            self.finish()?;
1501        }
1502        Ok(self.writer)
1503    }
1504}
1505
1506impl<W: Write> RecordBatchWriter for FileWriter<W> {
1507    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1508        self.write(batch)
1509    }
1510
1511    fn close(mut self) -> Result<(), ArrowError> {
1512        self.finish()
1513    }
1514}
1515
1516/// Arrow Stream Writer
1517///
1518/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1519///
1520/// # See Also
1521///
1522/// * [`FileWriter`] for writing IPC Files
1523///
1524/// # Example - Basic usage
1525/// ```
1526/// # use arrow_array::record_batch;
1527/// # use arrow_ipc::writer::StreamWriter;
1528/// # let mut stream = vec![]; // mimic a stream for the example
1529/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1530/// // create a new writer, the schema must be known in advance
1531/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1532/// // write each batch to the underlying stream
1533/// writer.write(&batch).unwrap();
1534/// // When all batches are written, call finish to flush all buffers
1535/// writer.finish().unwrap();
1536/// ```
1537/// # Example - Efficient delta dictionaries
1538/// ```
1539/// # use arrow_array::record_batch;
1540/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1541/// # use arrow_ipc::writer::DictionaryHandling;
1542/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1543/// # use arrow_array::{
1544/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1545/// #    RecordBatch, StringArray,
1546/// # };
1547/// # use std::sync::Arc;
1548///
1549/// let schema = Arc::new(Schema::new(vec![Field::new(
1550///    "col1",
1551///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1552///    true,
1553/// )]));
1554///
1555/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1556///
1557/// // `finish_preserve_values` will keep the dictionary values along with their
1558/// // key assignments so that they can be re-used in the next batch.
1559/// builder.append("a").unwrap();
1560/// builder.append("b").unwrap();
1561/// let array1 = builder.finish_preserve_values();
1562/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1563///
1564/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1565/// // and 'd' will take the next available key.
1566/// builder.append("a").unwrap();
1567/// builder.append("d").unwrap();
1568/// let array2 = builder.finish_preserve_values();
1569/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1570///
1571/// let mut stream = vec![];
1572/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1573/// // enable delta dictionaries in the writer
1574/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1575/// let mut writer = StreamWriter::try_new_with_options(&mut stream, &schema, options).unwrap();
1576///
1577/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1578/// // prior to the record batch.
1579/// writer.write(&batch1).unwrap();
1580/// // With the second batch only a delta dictionary with 'd' will be written
1581/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1582/// // Without it, 'a' and 'd' in this batch would have different keys than the
1583/// // first batch and so we'd have to send a replacement dictionary with new keys
1584/// // for both.
1585/// writer.write(&batch2).unwrap();
1586/// writer.finish().unwrap();
1587/// ```
1588/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1589pub struct StreamWriter<W> {
1590    /// The object to write to
1591    writer: W,
1592    /// IPC write options
1593    write_options: IpcWriteOptions,
1594    /// Whether the writer footer has been written, and the writer is finished
1595    finished: bool,
1596    /// Keeps track of dictionaries that have been written
1597    dictionary_tracker: DictionaryTracker,
1598
1599    data_gen: IpcDataGenerator,
1600
1601    ipc_write_context: IpcWriteContext,
1602}
1603
1604impl<W: Write> StreamWriter<BufWriter<W>> {
1605    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1606    ///
1607    /// See [`StreamWriter::try_new`] for an unbuffered version.
1608    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1609        Self::try_new(BufWriter::new(writer), schema)
1610    }
1611}
1612
1613impl<W: Write> StreamWriter<W> {
1614    /// Try to create a new writer, with the schema written as part of the header.
1615    ///
1616    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1617    ///
1618    /// # Errors
1619    ///
1620    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1621    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1622        let write_options = IpcWriteOptions::default();
1623        Self::try_new_with_options(writer, schema, write_options)
1624    }
1625
1626    /// Try to create a new writer with [`IpcWriteOptions`].
1627    ///
1628    /// # Errors
1629    ///
1630    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1631    pub fn try_new_with_options(
1632        mut writer: W,
1633        schema: &Schema,
1634        write_options: IpcWriteOptions,
1635    ) -> Result<Self, ArrowError> {
1636        let data_gen = IpcDataGenerator::default();
1637        let mut dictionary_tracker = DictionaryTracker::new(false);
1638
1639        // write the schema, set the written bytes to the schema
1640        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1641            schema,
1642            &mut dictionary_tracker,
1643            &write_options,
1644        );
1645        write_message(&mut writer, encoded_message, &write_options)?;
1646        Ok(Self {
1647            writer,
1648            write_options,
1649            finished: false,
1650            dictionary_tracker,
1651            data_gen,
1652            ipc_write_context: IpcWriteContext::default(),
1653        })
1654    }
1655
1656    /// Write a record batch to the stream
1657    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1658        if self.finished {
1659            return Err(ArrowError::IpcError(
1660                "Cannot write record batch to stream writer as it is closed".to_string(),
1661            ));
1662        }
1663
1664        self.data_gen.write(
1665            batch,
1666            &mut self.dictionary_tracker,
1667            &self.write_options,
1668            &mut self.ipc_write_context,
1669            &mut self.writer,
1670        )?;
1671        Ok(())
1672    }
1673
1674    /// Write continuation bytes, and mark the stream as done
1675    pub fn finish(&mut self) -> Result<(), ArrowError> {
1676        if self.finished {
1677            return Err(ArrowError::IpcError(
1678                "Cannot write footer to stream writer as it is closed".to_string(),
1679            ));
1680        }
1681
1682        write_continuation(&mut self.writer, &self.write_options, 0)?;
1683        self.writer.flush()?;
1684
1685        self.finished = true;
1686
1687        Ok(())
1688    }
1689
1690    /// Gets a reference to the underlying writer.
1691    pub fn get_ref(&self) -> &W {
1692        &self.writer
1693    }
1694
1695    /// Gets a mutable reference to the underlying writer.
1696    ///
1697    /// It is inadvisable to directly write to the underlying writer.
1698    pub fn get_mut(&mut self) -> &mut W {
1699        &mut self.writer
1700    }
1701
1702    /// Flush the underlying writer.
1703    ///
1704    /// Both the BufWriter and the underlying writer are flushed.
1705    pub fn flush(&mut self) -> Result<(), ArrowError> {
1706        self.writer.flush()?;
1707        Ok(())
1708    }
1709
1710    /// Unwraps the the underlying writer.
1711    ///
1712    /// The writer is flushed and the StreamWriter is finished before returning.
1713    ///
1714    /// # Errors
1715    ///
1716    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1717    /// or while flushing the writer.
1718    ///
1719    /// # Example
1720    ///
1721    /// ```
1722    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1723    /// # use arrow_ipc::MetadataVersion;
1724    /// # use arrow_schema::{ArrowError, Schema};
1725    /// # fn main() -> Result<(), ArrowError> {
1726    /// // The result we expect from an empty schema
1727    /// let expected = vec![
1728    ///     255, 255, 255, 255,  48,   0,   0,   0,
1729    ///      16,   0,   0,   0,   0,   0,  10,   0,
1730    ///      12,   0,  10,   0,   9,   0,   4,   0,
1731    ///      10,   0,   0,   0,  16,   0,   0,   0,
1732    ///       0,   1,   4,   0,   8,   0,   8,   0,
1733    ///       0,   0,   4,   0,   8,   0,   0,   0,
1734    ///       4,   0,   0,   0,   0,   0,   0,   0,
1735    ///     255, 255, 255, 255,   0,   0,   0,   0
1736    /// ];
1737    ///
1738    /// let schema = Schema::empty();
1739    /// let buffer: Vec<u8> = Vec::new();
1740    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1741    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1742    ///
1743    /// assert_eq!(stream_writer.into_inner()?, expected);
1744    /// # Ok(())
1745    /// # }
1746    /// ```
1747    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1748        if !self.finished {
1749            // `finish` flushes.
1750            self.finish()?;
1751        }
1752        Ok(self.writer)
1753    }
1754}
1755
1756impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1757    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1758        self.write(batch)
1759    }
1760
1761    fn close(mut self) -> Result<(), ArrowError> {
1762        self.finish()
1763    }
1764}
1765
1766/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1767pub struct EncodedData {
1768    /// An encoded crate::Message
1769    pub ipc_message: Vec<u8>,
1770    /// Arrow buffers to be written, should be an empty vec for schema messages
1771    pub arrow_data: Vec<u8>,
1772}
1773/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1774pub fn write_message<W: Write>(
1775    mut writer: W,
1776    encoded: EncodedData,
1777    write_options: &IpcWriteOptions,
1778) -> Result<(usize, usize), ArrowError> {
1779    let arrow_data_len = encoded.arrow_data.len();
1780    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1781        return Err(ArrowError::MemoryError(
1782            "Arrow data not aligned".to_string(),
1783        ));
1784    }
1785
1786    let a = usize::from(write_options.alignment - 1);
1787    let buffer = encoded.ipc_message;
1788    let flatbuf_size = buffer.len();
1789    let prefix_size = if write_options.write_legacy_ipc_format {
1790        4
1791    } else {
1792        8
1793    };
1794    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1795    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1796
1797    write_continuation(
1798        &mut writer,
1799        write_options,
1800        (aligned_size - prefix_size) as i32,
1801    )?;
1802
1803    // write the flatbuf
1804    if flatbuf_size > 0 {
1805        writer.write_all(&buffer)?;
1806    }
1807    // write padding
1808    writer.write_all(&PADDING[..padding_bytes])?;
1809
1810    // write arrow data
1811    let body_len = if arrow_data_len > 0 {
1812        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1813    } else {
1814        0
1815    };
1816
1817    Ok((aligned_size, body_len))
1818}
1819
1820fn write_body_buffers<W: Write>(
1821    mut writer: W,
1822    data: &[u8],
1823    alignment: u8,
1824) -> Result<usize, ArrowError> {
1825    let len = data.len();
1826    let pad_len = pad_to_alignment(alignment, len);
1827    let total_len = len + pad_len;
1828
1829    // write body buffer
1830    writer.write_all(data)?;
1831    if pad_len > 0 {
1832        writer.write_all(&PADDING[..pad_len])?;
1833    }
1834
1835    Ok(total_len)
1836}
1837
1838/// Write a record batch to the writer, writing the message size before the message
1839/// if the record batch is being written to a stream
1840fn write_continuation<W: Write>(
1841    mut writer: W,
1842    write_options: &IpcWriteOptions,
1843    total_len: i32,
1844) -> Result<usize, ArrowError> {
1845    let mut written = 8;
1846
1847    // the version of the writer determines whether continuation markers should be added
1848    match write_options.metadata_version {
1849        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1850            unreachable!("Options with the metadata version cannot be created")
1851        }
1852        crate::MetadataVersion::V4 => {
1853            if !write_options.write_legacy_ipc_format {
1854                // v0.15.0 format
1855                writer.write_all(&CONTINUATION_MARKER)?;
1856                written = 4;
1857            }
1858            writer.write_all(&total_len.to_le_bytes()[..])?;
1859        }
1860        crate::MetadataVersion::V5 => {
1861            // write continuation marker and message length
1862            writer.write_all(&CONTINUATION_MARKER)?;
1863            writer.write_all(&total_len.to_le_bytes()[..])?;
1864        }
1865        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1866    };
1867
1868    Ok(written)
1869}
1870
1871/// In V4, null types have no validity bitmap
1872/// In V5 and later, null and union types have no validity bitmap
1873/// Run end encoded type has no validity bitmap.
1874fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1875    if write_options.metadata_version < crate::MetadataVersion::V5 {
1876        !matches!(data_type, DataType::Null)
1877    } else {
1878        !matches!(
1879            data_type,
1880            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1881        )
1882    }
1883}
1884
1885/// Whether to truncate the buffer
1886#[inline]
1887fn buffer_need_truncate(
1888    array_offset: usize,
1889    buffer: &Buffer,
1890    spec: &BufferSpec,
1891    min_length: usize,
1892) -> bool {
1893    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1894}
1895
1896/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1897#[inline]
1898fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1899    match spec {
1900        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1901        _ => 0,
1902    }
1903}
1904
1905/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1906/// original start offset and length for use in slicing child data.
1907fn reencode_offsets<O: OffsetSizeTrait>(
1908    offsets: &Buffer,
1909    data: &ArrayData,
1910) -> (Buffer, usize, usize) {
1911    let offsets_slice: &[O] = offsets.typed_data::<O>();
1912    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1913
1914    let start_offset = offset_slice.first().unwrap();
1915    let end_offset = offset_slice.last().unwrap();
1916
1917    let offsets = match start_offset.as_usize() {
1918        0 => {
1919            let size = size_of::<O>();
1920            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1921        }
1922        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1923    };
1924
1925    let start_offset = start_offset.as_usize();
1926    let end_offset = end_offset.as_usize();
1927
1928    (offsets, start_offset, end_offset - start_offset)
1929}
1930
1931/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1932///
1933/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1934/// slicing the values buffer as appropriate. This helps reduce the encoded
1935/// size of sliced arrays, as values that have been sliced away are not encoded
1936fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1937    if data.is_empty() {
1938        // As per specification, offsets buffer has N+1 elements.
1939        // So an empty array should still be encoded with a single 0 offset.
1940        let mut offsets = MutableBuffer::new(size_of::<O>());
1941        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1942        return (offsets.into(), MutableBuffer::new(0).into());
1943    }
1944
1945    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1946    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1947    (offsets, values)
1948}
1949
1950/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1951/// of a values buffer.
1952fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1953    if data.is_empty() {
1954        // As per specification, offsets buffer has N+1 elements.
1955        // So an empty array should still be encoded with a single 0 offset.
1956        let mut offsets = MutableBuffer::new(size_of::<O>());
1957        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1958        return (offsets.into(), data.child_data()[0].slice(0, 0));
1959    }
1960
1961    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1962    let child_data = data.child_data()[0].slice(original_start_offset, len);
1963    (offsets, child_data)
1964}
1965
1966/// Returns the offsets, sizes, and child data buffers for a ListView array.
1967///
1968/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
1969/// and offsets can be non-monotonic. When slicing, we simply pass through the
1970/// offsets and sizes without re-encoding, and do not slice the child data.
1971fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1972    data: &ArrayData,
1973) -> (Buffer, Buffer, ArrayData) {
1974    if data.is_empty() {
1975        return (
1976            MutableBuffer::new(0).into(),
1977            MutableBuffer::new(0).into(),
1978            data.child_data()[0].slice(0, 0),
1979        );
1980    }
1981
1982    let offsets = &data.buffers()[0];
1983    let sizes = &data.buffers()[1];
1984
1985    let element_size = std::mem::size_of::<O>();
1986    let offsets_slice =
1987        offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1988    let sizes_slice =
1989        sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1990
1991    let child_data = data.child_data()[0].clone();
1992
1993    (offsets_slice, sizes_slice, child_data)
1994}
1995
1996/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
1997///
1998/// The views buffer is sliced to only include views in the valid range based on
1999/// the array's offset and length. This helps reduce the encoded size of sliced
2000/// arrays
2001///
2002fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
2003    let buffer = &array_data.buffers()[0];
2004    let layout = layout(array_data.data_type());
2005    let spec = &layout.buffers[0];
2006
2007    let byte_width = get_buffer_element_width(spec);
2008    let min_length = array_data.len() * byte_width;
2009    if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
2010        let byte_offset = array_data.offset() * byte_width;
2011        let buffer_length = min(min_length, buffer.len() - byte_offset);
2012        buffer.slice_with_length(byte_offset, buffer_length)
2013    } else {
2014        buffer.clone()
2015    }
2016}
2017
2018/// Recursively encodes `array_data` into its IPC representation.
2019///
2020/// Output goes to two separate channels:
2021/// - `meta`: accumulates IPC metadata (`nodes` and `buffers`) for the flatbuffer header.
2022/// - `sink`: the raw Arrow data bytes that form the IPC message body.
2023fn write_array_data(
2024    array_data: &ArrayData,
2025    meta: &mut IpcMetadataBuilder,
2026    sink: &mut IpcBodySink<'_>,
2027    offset: i64,
2028    compression_codec: Option<CompressionCodec>,
2029    ipc_write_context: &mut IpcWriteContext,
2030    write_options: &IpcWriteOptions,
2031) -> Result<i64, ArrowError> {
2032    let mut offset = offset;
2033    let num_rows = array_data.len();
2034    if !matches!(array_data.data_type(), DataType::Null) {
2035        meta.nodes.push(crate::FieldNode::new(
2036            num_rows as i64,
2037            array_data.null_count() as i64,
2038        ));
2039    } else {
2040        // NullArray's null_count equals to len, but ArrayData null_count is always 0.
2041        meta.nodes
2042            .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
2043    }
2044    if has_validity_bitmap(array_data.data_type(), write_options) {
2045        // write null buffer if exists
2046        let null_buffer = match array_data.nulls() {
2047            None => {
2048                // create a buffer and fill it with valid bits
2049                let num_bytes = bit_util::ceil(num_rows, 8);
2050                let buffer = MutableBuffer::new(num_bytes);
2051                let buffer = buffer.with_bitset(num_bytes, true);
2052                buffer.into()
2053            }
2054            Some(buffer) => buffer.inner().sliced(),
2055        };
2056
2057        offset = encode_sink_buffer(
2058            null_buffer,
2059            meta,
2060            sink,
2061            offset,
2062            compression_codec,
2063            ipc_write_context,
2064            write_options.alignment,
2065        )?;
2066    }
2067
2068    let data_type = array_data.data_type();
2069    if matches!(data_type, DataType::Binary | DataType::Utf8) {
2070        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2071        for buffer in [offsets, values] {
2072            offset = encode_sink_buffer(
2073                buffer,
2074                meta,
2075                sink,
2076                offset,
2077                compression_codec,
2078                ipc_write_context,
2079                write_options.alignment,
2080            )?;
2081        }
2082    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2083        // Slicing the views buffer is safe and easy,
2084        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
2085        //
2086        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
2087        // If users wants to "compact" the arrays prior to sending them over IPC,
2088        // they should consider the gc API suggested in #5513
2089        let views = get_or_truncate_buffer(array_data);
2090        offset = encode_sink_buffer(
2091            views,
2092            meta,
2093            sink,
2094            offset,
2095            compression_codec,
2096            ipc_write_context,
2097            write_options.alignment,
2098        )?;
2099
2100        for buffer in array_data.buffers().iter().skip(1) {
2101            offset = encode_sink_buffer(
2102                buffer.clone(),
2103                meta,
2104                sink,
2105                offset,
2106                compression_codec,
2107                ipc_write_context,
2108                write_options.alignment,
2109            )?;
2110        }
2111    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2112        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2113        for buffer in [offsets, values] {
2114            offset = encode_sink_buffer(
2115                buffer,
2116                meta,
2117                sink,
2118                offset,
2119                compression_codec,
2120                ipc_write_context,
2121                write_options.alignment,
2122            )?;
2123        }
2124    } else if DataType::is_numeric(data_type)
2125        || DataType::is_temporal(data_type)
2126        || matches!(
2127            array_data.data_type(),
2128            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2129        )
2130    {
2131        // Truncate values
2132        assert_eq!(array_data.buffers().len(), 1);
2133
2134        let buffer = get_or_truncate_buffer(array_data);
2135        offset = encode_sink_buffer(
2136            buffer,
2137            meta,
2138            sink,
2139            offset,
2140            compression_codec,
2141            ipc_write_context,
2142            write_options.alignment,
2143        )?;
2144    } else if matches!(data_type, DataType::Boolean) {
2145        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
2146        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
2147        assert_eq!(array_data.buffers().len(), 1);
2148
2149        let buffer = &array_data.buffers()[0];
2150        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2151        offset = encode_sink_buffer(
2152            buffer,
2153            meta,
2154            sink,
2155            offset,
2156            compression_codec,
2157            ipc_write_context,
2158            write_options.alignment,
2159        )?;
2160    } else if matches!(
2161        data_type,
2162        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2163    ) {
2164        assert_eq!(array_data.buffers().len(), 1);
2165        assert_eq!(array_data.child_data().len(), 1);
2166
2167        // Truncate offsets and the child data to avoid writing unnecessary data
2168        let (offsets, sliced_child_data) = match data_type {
2169            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2170            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2171            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2172            _ => unreachable!(),
2173        };
2174        offset = encode_sink_buffer(
2175            offsets,
2176            meta,
2177            sink,
2178            offset,
2179            compression_codec,
2180            ipc_write_context,
2181            write_options.alignment,
2182        )?;
2183        offset = write_array_data(
2184            &sliced_child_data,
2185            meta,
2186            sink,
2187            offset,
2188            compression_codec,
2189            ipc_write_context,
2190            write_options,
2191        )?;
2192        return Ok(offset);
2193    } else if matches!(
2194        data_type,
2195        DataType::ListView(_) | DataType::LargeListView(_)
2196    ) {
2197        assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
2198        assert_eq!(array_data.child_data().len(), 1);
2199
2200        let (offsets, sizes, child_data) = match data_type {
2201            DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2202            DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2203            _ => unreachable!(),
2204        };
2205
2206        offset = encode_sink_buffer(
2207            offsets,
2208            meta,
2209            sink,
2210            offset,
2211            compression_codec,
2212            ipc_write_context,
2213            write_options.alignment,
2214        )?;
2215        offset = encode_sink_buffer(
2216            sizes,
2217            meta,
2218            sink,
2219            offset,
2220            compression_codec,
2221            ipc_write_context,
2222            write_options.alignment,
2223        )?;
2224
2225        offset = write_array_data(
2226            &child_data,
2227            meta,
2228            sink,
2229            offset,
2230            compression_codec,
2231            ipc_write_context,
2232            write_options,
2233        )?;
2234        return Ok(offset);
2235    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2236        assert_eq!(array_data.child_data().len(), 1);
2237        let fixed_size = *fixed_size as usize;
2238
2239        let child_offset = array_data.offset() * fixed_size;
2240        let child_length = array_data.len() * fixed_size;
2241        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2242
2243        offset = write_array_data(
2244            &child_data,
2245            meta,
2246            sink,
2247            offset,
2248            compression_codec,
2249            ipc_write_context,
2250            write_options,
2251        )?;
2252        return Ok(offset);
2253    } else {
2254        for buffer in array_data.buffers() {
2255            offset = encode_sink_buffer(
2256                buffer.clone(),
2257                meta,
2258                sink,
2259                offset,
2260                compression_codec,
2261                ipc_write_context,
2262                write_options.alignment,
2263            )?;
2264        }
2265    }
2266
2267    match array_data.data_type() {
2268        DataType::Dictionary(_, _) => {}
2269        DataType::RunEndEncoded(_, _) => {
2270            // unslice the run encoded array.
2271            let arr = unslice_run_array(array_data.clone())?;
2272            // recursively write out nested structures
2273            for data_ref in arr.child_data() {
2274                // write the nested data (e.g list data)
2275                offset = write_array_data(
2276                    data_ref,
2277                    meta,
2278                    sink,
2279                    offset,
2280                    compression_codec,
2281                    ipc_write_context,
2282                    write_options,
2283                )?;
2284            }
2285        }
2286        _ => {
2287            // recursively write out nested structures
2288            for data_ref in array_data.child_data() {
2289                // write the nested data (e.g list data)
2290                offset = write_array_data(
2291                    data_ref,
2292                    meta,
2293                    sink,
2294                    offset,
2295                    compression_codec,
2296                    ipc_write_context,
2297                    write_options,
2298                )?;
2299            }
2300        }
2301    }
2302    Ok(offset)
2303}
2304
2305/// Encodes a single Arrow [`Buffer`] into the IPC body and records its metadata.
2306///
2307/// - `buffer`: the Arrow data buffer to encode (validity bitmap, offsets, values, etc.)
2308/// - `buffers`: in-progress list of IPC `Buffer` metadata entries (body offset + length) that
2309///   will eventually be serialised into the flatbuffer `RecordBatch` header.
2310/// - `sink`: destination for the actual encoded bytes; either a contiguous `Vec<u8>` for
2311///   in-memory writes, or a list of [`EncodedBuffer`] segments for deferred zero-copy streaming.
2312/// - `offset`: running byte offset into the IPC message body, used to compute the metadata entry.
2313/// - `compression_codec` / `ipc_write_context`: if `Some`, the buffer is compressed before
2314///   writing; `ipc_write_context` provides reusable scratch space across calls.
2315/// - `alignment`: each buffer is padded to this many bytes so the next buffer starts aligned.
2316///
2317/// Returns the updated `offset` (advanced by the encoded length plus any alignment padding).
2318fn encode_sink_buffer(
2319    buffer: Buffer,
2320    ipc_meta_data: &mut IpcMetadataBuilder,
2321    sink: &mut IpcBodySink<'_>,
2322    offset: i64,
2323    compression_codec: Option<CompressionCodec>,
2324    ipc_write_context: &mut IpcWriteContext,
2325    alignment: u8,
2326) -> Result<i64, ArrowError> {
2327    let (encoded, len) = match compression_codec {
2328        None => {
2329            let len = buffer.len() as i64;
2330            (EncodedBuffer::Raw(buffer), len)
2331        }
2332        Some(codec) => {
2333            let mut scratch = Vec::new();
2334            let written =
2335                codec.compress_to_vec(buffer.as_slice(), &mut scratch, ipc_write_context)?;
2336            let len = i64::try_from(written)
2337                .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2338            (EncodedBuffer::Compressed(scratch), len)
2339        }
2340    };
2341
2342    let pad_len = pad_to_alignment(alignment, len as usize);
2343    sink.write(pad_len, encoded);
2344    ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2345    Ok(offset + len + pad_len as i64)
2346}
2347
2348const PADDING: [u8; 64] = [0; 64];
2349
2350/// Estimates the number of [`EncodedBuffer`] segments that [`write_array_data`]
2351/// will produce for a column of the given type.
2352///
2353/// Based on the Arrow IPC buffer layout
2354/// (<https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message>):
2355#[inline]
2356fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2357    match dt {
2358        DataType::Null => 0,
2359
2360        DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2361
2362        DataType::BinaryView | DataType::Utf8View => 3,
2363
2364        DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2365            2 + estimate_encoded_buffer_count(f.data_type())
2366        }
2367
2368        DataType::ListView(f) | DataType::LargeListView(f) => {
2369            3 + estimate_encoded_buffer_count(f.data_type())
2370        }
2371
2372        DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2373
2374        DataType::Struct(fields) => {
2375            1 + fields
2376                .iter()
2377                .map(|f| estimate_encoded_buffer_count(f.data_type()))
2378                .sum::<usize>()
2379        }
2380
2381        // Dictionary indices only; dictionary body is a separate IPC message.
2382        DataType::Dictionary(_, _) => 2,
2383
2384        DataType::Union(fields, UnionMode::Sparse) => {
2385            1 + fields
2386                .iter()
2387                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2388                .sum::<usize>()
2389        }
2390        DataType::Union(fields, UnionMode::Dense) => {
2391            2 + fields
2392                .iter()
2393                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2394                .sum::<usize>()
2395        }
2396
2397        DataType::RunEndEncoded(run_ends, values) => {
2398            estimate_encoded_buffer_count(run_ends.data_type())
2399                + estimate_encoded_buffer_count(values.data_type())
2400        }
2401        // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity + values.
2402        _ => 2,
2403    }
2404}
2405
2406/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2407#[inline]
2408fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2409    let a = usize::from(alignment - 1);
2410    ((len + a) & !a) - len
2411}
2412
2413#[cfg(test)]
2414mod tests {
2415    use std::hash::Hasher;
2416    use std::io::Cursor;
2417    use std::io::Seek;
2418
2419    use arrow_array::builder::FixedSizeListBuilder;
2420    use arrow_array::builder::Float32Builder;
2421    use arrow_array::builder::Int64Builder;
2422    use arrow_array::builder::MapBuilder;
2423    use arrow_array::builder::StringViewBuilder;
2424    use arrow_array::builder::UnionBuilder;
2425    use arrow_array::builder::{
2426        GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2427    };
2428    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2429    use arrow_array::types::*;
2430    use arrow_buffer::ScalarBuffer;
2431
2432    use crate::MetadataVersion;
2433    use crate::convert::fb_to_schema;
2434    use crate::reader::*;
2435    use crate::root_as_footer;
2436
2437    use super::*;
2438
2439    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2440        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2441        writer.write(rb).unwrap();
2442        writer.finish().unwrap();
2443        writer.into_inner().unwrap()
2444    }
2445
2446    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2447        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2448        reader.next().unwrap().unwrap()
2449    }
2450
2451    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2452        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2453        // without needing to construct a giant array to spill over the 64-byte default alignment
2454        // boundary.
2455        const IPC_ALIGNMENT: usize = 8;
2456
2457        let mut stream_writer = StreamWriter::try_new_with_options(
2458            vec![],
2459            record.schema_ref(),
2460            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2461        )
2462        .unwrap();
2463        stream_writer.write(record).unwrap();
2464        stream_writer.finish().unwrap();
2465        stream_writer.into_inner().unwrap()
2466    }
2467
2468    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2469        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2470        stream_reader.next().unwrap().unwrap()
2471    }
2472
2473    #[test]
2474    #[cfg(feature = "lz4")]
2475    fn test_write_empty_record_batch_lz4_compression() {
2476        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2477        let values: Vec<Option<i32>> = vec![];
2478        let array = Int32Array::from(values);
2479        let record_batch =
2480            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2481
2482        let mut file = tempfile::tempfile().unwrap();
2483
2484        {
2485            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2486                .unwrap()
2487                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2488                .unwrap();
2489
2490            let mut writer =
2491                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2492            writer.write(&record_batch).unwrap();
2493            writer.finish().unwrap();
2494        }
2495        file.rewind().unwrap();
2496        {
2497            // read file
2498            let reader = FileReader::try_new(file, None).unwrap();
2499            for read_batch in reader {
2500                read_batch
2501                    .unwrap()
2502                    .columns()
2503                    .iter()
2504                    .zip(record_batch.columns())
2505                    .for_each(|(a, b)| {
2506                        assert_eq!(a.data_type(), b.data_type());
2507                        assert_eq!(a.len(), b.len());
2508                        assert_eq!(a.null_count(), b.null_count());
2509                    });
2510            }
2511        }
2512    }
2513
2514    #[test]
2515    #[cfg(feature = "lz4")]
2516    fn test_write_file_with_lz4_compression() {
2517        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2518        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2519        let array = Int32Array::from(values);
2520        let record_batch =
2521            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2522
2523        let mut file = tempfile::tempfile().unwrap();
2524        {
2525            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2526                .unwrap()
2527                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2528                .unwrap();
2529
2530            let mut writer =
2531                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2532            writer.write(&record_batch).unwrap();
2533            writer.finish().unwrap();
2534        }
2535        file.rewind().unwrap();
2536        {
2537            // read file
2538            let reader = FileReader::try_new(file, None).unwrap();
2539            for read_batch in reader {
2540                read_batch
2541                    .unwrap()
2542                    .columns()
2543                    .iter()
2544                    .zip(record_batch.columns())
2545                    .for_each(|(a, b)| {
2546                        assert_eq!(a.data_type(), b.data_type());
2547                        assert_eq!(a.len(), b.len());
2548                        assert_eq!(a.null_count(), b.null_count());
2549                    });
2550            }
2551        }
2552    }
2553
2554    #[test]
2555    #[cfg(feature = "zstd")]
2556    fn test_write_file_with_zstd_compression() {
2557        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2558        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2559        let array = Int32Array::from(values);
2560        let record_batch =
2561            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2562        let mut file = tempfile::tempfile().unwrap();
2563        {
2564            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2565                .unwrap()
2566                .try_with_compression(Some(crate::CompressionType::ZSTD))
2567                .unwrap()
2568                .try_with_compression_level(Some(1))
2569                .unwrap();
2570
2571            let mut writer =
2572                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2573            writer.write(&record_batch).unwrap();
2574            writer.finish().unwrap();
2575        }
2576        file.rewind().unwrap();
2577        {
2578            // read file
2579            let reader = FileReader::try_new(file, None).unwrap();
2580            for read_batch in reader {
2581                read_batch
2582                    .unwrap()
2583                    .columns()
2584                    .iter()
2585                    .zip(record_batch.columns())
2586                    .for_each(|(a, b)| {
2587                        assert_eq!(a.data_type(), b.data_type());
2588                        assert_eq!(a.len(), b.len());
2589                        assert_eq!(a.null_count(), b.null_count());
2590                    });
2591            }
2592        }
2593    }
2594
2595    #[test]
2596    fn test_write_file() {
2597        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2598        let values: Vec<Option<u32>> = vec![
2599            Some(999),
2600            None,
2601            Some(235),
2602            Some(123),
2603            None,
2604            None,
2605            None,
2606            None,
2607            None,
2608        ];
2609        let array1 = UInt32Array::from(values);
2610        let batch =
2611            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2612                .unwrap();
2613        let mut file = tempfile::tempfile().unwrap();
2614        {
2615            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2616
2617            writer.write(&batch).unwrap();
2618            writer.finish().unwrap();
2619        }
2620        file.rewind().unwrap();
2621
2622        {
2623            let mut reader = FileReader::try_new(file, None).unwrap();
2624            while let Some(Ok(read_batch)) = reader.next() {
2625                read_batch
2626                    .columns()
2627                    .iter()
2628                    .zip(batch.columns())
2629                    .for_each(|(a, b)| {
2630                        assert_eq!(a.data_type(), b.data_type());
2631                        assert_eq!(a.len(), b.len());
2632                        assert_eq!(a.null_count(), b.null_count());
2633                    });
2634            }
2635        }
2636    }
2637
2638    #[test]
2639    fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2640        let name = StringArray::from(Vec::<String>::new());
2641        let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2642
2643        assert_eq!(name.len(), 0);
2644        assert_eq!(
2645            offsets.len(),
2646            std::mem::size_of::<i32>(),
2647            "offsets buffer should contain one zero i32 offset"
2648        );
2649        assert_eq!(values.len(), 0, "values buffer should remain empty");
2650    }
2651
2652    #[test]
2653    fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2654        let name = LargeStringArray::from(Vec::<String>::new());
2655        let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2656
2657        assert_eq!(name.len(), 0);
2658        assert_eq!(
2659            offsets.len(),
2660            std::mem::size_of::<i64>(),
2661            "offsets buffer should contain one zero i64 offset"
2662        );
2663        assert_eq!(values.len(), 0, "values buffer should remain empty");
2664    }
2665
2666    #[test]
2667    fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2668        let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2669        let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2670
2671        assert_eq!(list.len(), 0);
2672        assert_eq!(
2673            offsets.len(),
2674            std::mem::size_of::<i32>(),
2675            "offsets buffer should contain one zero i32 offset"
2676        );
2677        assert_eq!(child_data.len(), 0, "child data should remain empty");
2678    }
2679
2680    #[test]
2681    fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2682        let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2683        let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2684
2685        assert_eq!(list.len(), 0);
2686        assert_eq!(
2687            offsets.len(),
2688            std::mem::size_of::<i64>(),
2689            "offsets buffer should contain one zero i64 offset"
2690        );
2691        assert_eq!(child_data.len(), 0, "child data should remain empty");
2692    }
2693
2694    fn write_null_file(options: IpcWriteOptions) {
2695        let schema = Schema::new(vec![
2696            Field::new("nulls", DataType::Null, true),
2697            Field::new("int32s", DataType::Int32, false),
2698            Field::new("nulls2", DataType::Null, true),
2699            Field::new("f64s", DataType::Float64, false),
2700        ]);
2701        let array1 = NullArray::new(32);
2702        let array2 = Int32Array::from(vec![1; 32]);
2703        let array3 = NullArray::new(32);
2704        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2705        let batch = RecordBatch::try_new(
2706            Arc::new(schema.clone()),
2707            vec![
2708                Arc::new(array1) as ArrayRef,
2709                Arc::new(array2) as ArrayRef,
2710                Arc::new(array3) as ArrayRef,
2711                Arc::new(array4) as ArrayRef,
2712            ],
2713        )
2714        .unwrap();
2715        let mut file = tempfile::tempfile().unwrap();
2716        {
2717            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2718
2719            writer.write(&batch).unwrap();
2720            writer.finish().unwrap();
2721        }
2722
2723        file.rewind().unwrap();
2724
2725        {
2726            let reader = FileReader::try_new(file, None).unwrap();
2727            reader.for_each(|maybe_batch| {
2728                maybe_batch
2729                    .unwrap()
2730                    .columns()
2731                    .iter()
2732                    .zip(batch.columns())
2733                    .for_each(|(a, b)| {
2734                        assert_eq!(a.data_type(), b.data_type());
2735                        assert_eq!(a.len(), b.len());
2736                        assert_eq!(a.null_count(), b.null_count());
2737                    });
2738            });
2739        }
2740    }
2741    #[test]
2742    fn test_write_null_file_v4() {
2743        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2744        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2745        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2746        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2747    }
2748
2749    #[test]
2750    fn test_write_null_file_v5() {
2751        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2752        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2753    }
2754
2755    #[test]
2756    fn track_union_nested_dict() {
2757        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2758
2759        let array = Arc::new(inner) as ArrayRef;
2760
2761        // Dict field with id 2
2762        #[allow(deprecated)]
2763        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2764        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2765
2766        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2767        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2768
2769        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2770
2771        let schema = Arc::new(Schema::new(vec![Field::new(
2772            "union",
2773            union.data_type().clone(),
2774            false,
2775        )]));
2776
2777        let r#gen = IpcDataGenerator::default();
2778        let mut dict_tracker = DictionaryTracker::new(false);
2779        r#gen.schema_to_bytes_with_dictionary_tracker(
2780            &schema,
2781            &mut dict_tracker,
2782            &IpcWriteOptions::default(),
2783        );
2784
2785        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2786
2787        r#gen
2788            .encode(
2789                &batch,
2790                &mut dict_tracker,
2791                &Default::default(),
2792                &mut Default::default(),
2793            )
2794            .unwrap();
2795
2796        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2797        // so we expect the dict will be keyed to 0
2798        assert!(dict_tracker.written.contains_key(&0));
2799    }
2800
2801    #[test]
2802    fn track_struct_nested_dict() {
2803        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2804
2805        let array = Arc::new(inner) as ArrayRef;
2806
2807        // Dict field with id 2
2808        #[allow(deprecated)]
2809        let dctfield = Arc::new(Field::new_dict(
2810            "dict",
2811            array.data_type().clone(),
2812            false,
2813            2,
2814            false,
2815        ));
2816
2817        let s = StructArray::from(vec![(dctfield, array)]);
2818        let struct_array = Arc::new(s) as ArrayRef;
2819
2820        let schema = Arc::new(Schema::new(vec![Field::new(
2821            "struct",
2822            struct_array.data_type().clone(),
2823            false,
2824        )]));
2825
2826        let r#gen = IpcDataGenerator::default();
2827        let mut dict_tracker = DictionaryTracker::new(false);
2828        r#gen.schema_to_bytes_with_dictionary_tracker(
2829            &schema,
2830            &mut dict_tracker,
2831            &IpcWriteOptions::default(),
2832        );
2833
2834        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2835
2836        r#gen
2837            .encode(
2838                &batch,
2839                &mut dict_tracker,
2840                &Default::default(),
2841                &mut Default::default(),
2842            )
2843            .unwrap();
2844
2845        assert!(dict_tracker.written.contains_key(&0));
2846    }
2847
2848    fn write_union_file(options: IpcWriteOptions) {
2849        let schema = Schema::new(vec![Field::new_union(
2850            "union",
2851            vec![0, 1],
2852            vec![
2853                Field::new("a", DataType::Int32, false),
2854                Field::new("c", DataType::Float64, false),
2855            ],
2856            UnionMode::Sparse,
2857        )]);
2858        let mut builder = UnionBuilder::with_capacity_sparse(5);
2859        builder.append::<Int32Type>("a", 1).unwrap();
2860        builder.append_null::<Int32Type>("a").unwrap();
2861        builder.append::<Float64Type>("c", 3.0).unwrap();
2862        builder.append_null::<Float64Type>("c").unwrap();
2863        builder.append::<Int32Type>("a", 4).unwrap();
2864        let union = builder.build().unwrap();
2865
2866        let batch =
2867            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2868                .unwrap();
2869
2870        let mut file = tempfile::tempfile().unwrap();
2871        {
2872            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2873
2874            writer.write(&batch).unwrap();
2875            writer.finish().unwrap();
2876        }
2877        file.rewind().unwrap();
2878
2879        {
2880            let reader = FileReader::try_new(file, None).unwrap();
2881            reader.for_each(|maybe_batch| {
2882                maybe_batch
2883                    .unwrap()
2884                    .columns()
2885                    .iter()
2886                    .zip(batch.columns())
2887                    .for_each(|(a, b)| {
2888                        assert_eq!(a.data_type(), b.data_type());
2889                        assert_eq!(a.len(), b.len());
2890                        assert_eq!(a.null_count(), b.null_count());
2891                    });
2892            });
2893        }
2894    }
2895
2896    #[test]
2897    fn test_write_union_file_v4_v5() {
2898        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2899        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2900    }
2901
2902    #[test]
2903    fn test_write_view_types() {
2904        const LONG_TEST_STRING: &str =
2905            "This is a long string to make sure binary view array handles it";
2906        let schema = Schema::new(vec![
2907            Field::new("field1", DataType::BinaryView, true),
2908            Field::new("field2", DataType::Utf8View, true),
2909        ]);
2910        let values: Vec<Option<&[u8]>> = vec![
2911            Some(b"foo"),
2912            Some(b"bar"),
2913            Some(LONG_TEST_STRING.as_bytes()),
2914        ];
2915        let binary_array = BinaryViewArray::from_iter(values);
2916        let utf8_array =
2917            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2918        let record_batch = RecordBatch::try_new(
2919            Arc::new(schema.clone()),
2920            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2921        )
2922        .unwrap();
2923
2924        let mut file = tempfile::tempfile().unwrap();
2925        {
2926            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2927            writer.write(&record_batch).unwrap();
2928            writer.finish().unwrap();
2929        }
2930        file.rewind().unwrap();
2931        {
2932            let mut reader = FileReader::try_new(&file, None).unwrap();
2933            let read_batch = reader.next().unwrap().unwrap();
2934            read_batch
2935                .columns()
2936                .iter()
2937                .zip(record_batch.columns())
2938                .for_each(|(a, b)| {
2939                    assert_eq!(a, b);
2940                });
2941        }
2942        file.rewind().unwrap();
2943        {
2944            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2945            let read_batch = reader.next().unwrap().unwrap();
2946            assert_eq!(read_batch.num_columns(), 1);
2947            let read_array = read_batch.column(0);
2948            let write_array = record_batch.column(0);
2949            assert_eq!(read_array, write_array);
2950        }
2951    }
2952
2953    #[test]
2954    fn truncate_ipc_record_batch() {
2955        fn create_batch(rows: usize) -> RecordBatch {
2956            let schema = Schema::new(vec![
2957                Field::new("a", DataType::Int32, false),
2958                Field::new("b", DataType::Utf8, false),
2959            ]);
2960
2961            let a = Int32Array::from_iter_values(0..rows as i32);
2962            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2963
2964            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2965        }
2966
2967        let big_record_batch = create_batch(65536);
2968
2969        let length = 5;
2970        let small_record_batch = create_batch(length);
2971
2972        let offset = 2;
2973        let record_batch_slice = big_record_batch.slice(offset, length);
2974        assert!(
2975            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2976        );
2977        assert_eq!(
2978            serialize_stream(&small_record_batch).len(),
2979            serialize_stream(&record_batch_slice).len()
2980        );
2981
2982        assert_eq!(
2983            deserialize_stream(serialize_stream(&record_batch_slice)),
2984            record_batch_slice
2985        );
2986    }
2987
2988    #[test]
2989    fn truncate_ipc_record_batch_with_nulls() {
2990        fn create_batch() -> RecordBatch {
2991            let schema = Schema::new(vec![
2992                Field::new("a", DataType::Int32, true),
2993                Field::new("b", DataType::Utf8, true),
2994            ]);
2995
2996            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2997            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2998
2999            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3000        }
3001
3002        let record_batch = create_batch();
3003        let record_batch_slice = record_batch.slice(1, 2);
3004        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3005
3006        assert!(
3007            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3008        );
3009
3010        assert!(deserialized_batch.column(0).is_null(0));
3011        assert!(deserialized_batch.column(0).is_valid(1));
3012        assert!(deserialized_batch.column(1).is_valid(0));
3013        assert!(deserialized_batch.column(1).is_valid(1));
3014
3015        assert_eq!(record_batch_slice, deserialized_batch);
3016    }
3017
3018    #[test]
3019    fn truncate_ipc_dictionary_array() {
3020        fn create_batch() -> RecordBatch {
3021            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
3022                .into_iter()
3023                .collect();
3024            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3025
3026            let array = DictionaryArray::new(keys, Arc::new(values));
3027
3028            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
3029
3030            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
3031        }
3032
3033        let record_batch = create_batch();
3034        let record_batch_slice = record_batch.slice(1, 2);
3035        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3036
3037        assert!(
3038            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3039        );
3040
3041        assert!(deserialized_batch.column(0).is_valid(0));
3042        assert!(deserialized_batch.column(0).is_null(1));
3043
3044        assert_eq!(record_batch_slice, deserialized_batch);
3045    }
3046
3047    #[test]
3048    fn truncate_ipc_struct_array() {
3049        fn create_batch() -> RecordBatch {
3050            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
3051                .into_iter()
3052                .collect();
3053            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3054
3055            let struct_array = StructArray::from(vec![
3056                (
3057                    Arc::new(Field::new("s", DataType::Utf8, true)),
3058                    Arc::new(strings) as ArrayRef,
3059                ),
3060                (
3061                    Arc::new(Field::new("c", DataType::Int32, true)),
3062                    Arc::new(ints) as ArrayRef,
3063                ),
3064            ]);
3065
3066            let schema = Schema::new(vec![Field::new(
3067                "struct_array",
3068                struct_array.data_type().clone(),
3069                true,
3070            )]);
3071
3072            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3073        }
3074
3075        let record_batch = create_batch();
3076        let record_batch_slice = record_batch.slice(1, 2);
3077        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3078
3079        assert!(
3080            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3081        );
3082
3083        let structs = deserialized_batch
3084            .column(0)
3085            .as_any()
3086            .downcast_ref::<StructArray>()
3087            .unwrap();
3088
3089        assert!(structs.column(0).is_null(0));
3090        assert!(structs.column(0).is_valid(1));
3091        assert!(structs.column(1).is_valid(0));
3092        assert!(structs.column(1).is_null(1));
3093        assert_eq!(record_batch_slice, deserialized_batch);
3094    }
3095
3096    #[test]
3097    fn truncate_ipc_string_array_with_all_empty_string() {
3098        fn create_batch() -> RecordBatch {
3099            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3100            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3101            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3102        }
3103
3104        let record_batch = create_batch();
3105        let record_batch_slice = record_batch.slice(0, 1);
3106        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3107
3108        assert!(
3109            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3110        );
3111        assert_eq!(record_batch_slice, deserialized_batch);
3112    }
3113
3114    #[test]
3115    fn test_stream_writer_writes_array_slice() {
3116        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3117        assert_eq!(
3118            vec![Some(1), Some(2), Some(3)],
3119            array.iter().collect::<Vec<_>>()
3120        );
3121
3122        let sliced = array.slice(1, 2);
3123        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3124
3125        let batch = RecordBatch::try_new(
3126            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3127            vec![Arc::new(sliced)],
3128        )
3129        .expect("new batch");
3130
3131        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3132        writer.write(&batch).expect("write");
3133        let outbuf = writer.into_inner().expect("inner");
3134
3135        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3136        let read_batch = reader.next().unwrap().expect("read batch");
3137
3138        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3139        assert_eq!(
3140            vec![Some(2), Some(3)],
3141            read_array.iter().collect::<Vec<_>>()
3142        );
3143    }
3144
3145    #[test]
3146    fn test_large_slice_uint32() {
3147        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3148            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3149        )));
3150    }
3151
3152    #[test]
3153    fn test_large_slice_string() {
3154        let strings: Vec<_> = (0..8000)
3155            .map(|i| {
3156                if i % 2 == 0 {
3157                    Some(format!("value{i}"))
3158                } else {
3159                    None
3160                }
3161            })
3162            .collect();
3163
3164        ensure_roundtrip(Arc::new(StringArray::from(strings)));
3165    }
3166
3167    #[test]
3168    fn test_large_slice_string_list() {
3169        let mut ls = ListBuilder::new(StringBuilder::new());
3170
3171        let mut s = String::new();
3172        for row_number in 0..8000 {
3173            if row_number % 2 == 0 {
3174                for list_element in 0..1000 {
3175                    s.clear();
3176                    use std::fmt::Write;
3177                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3178                    ls.values().append_value(&s);
3179                }
3180                ls.append(true)
3181            } else {
3182                ls.append(false); // null
3183            }
3184        }
3185
3186        ensure_roundtrip(Arc::new(ls.finish()));
3187    }
3188
3189    #[test]
3190    fn test_large_slice_string_list_of_lists() {
3191        // The reason for the special test is to verify reencode_offsets which looks both at
3192        // the starting offset and the data offset.  So need a dataset where the starting_offset
3193        // is zero but the data offset is not.
3194        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3195
3196        for _ in 0..4000 {
3197            ls.values().append(true);
3198            ls.append(true)
3199        }
3200
3201        let mut s = String::new();
3202        for row_number in 0..4000 {
3203            if row_number % 2 == 0 {
3204                for list_element in 0..1000 {
3205                    s.clear();
3206                    use std::fmt::Write;
3207                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3208                    ls.values().values().append_value(&s);
3209                }
3210                ls.values().append(true);
3211                ls.append(true)
3212            } else {
3213                ls.append(false); // null
3214            }
3215        }
3216
3217        ensure_roundtrip(Arc::new(ls.finish()));
3218    }
3219
3220    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
3221    fn ensure_roundtrip(array: ArrayRef) {
3222        let num_rows = array.len();
3223        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3224        // take off the first element
3225        let sliced_batch = orig_batch.slice(1, num_rows - 1);
3226
3227        let schema = orig_batch.schema();
3228        let stream_data = {
3229            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3230            writer.write(&sliced_batch).unwrap();
3231            writer.into_inner().unwrap()
3232        };
3233        let read_batch = {
3234            let projection = None;
3235            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3236            reader
3237                .next()
3238                .expect("expect no errors reading batch")
3239                .expect("expect batch")
3240        };
3241        assert_eq!(sliced_batch, read_batch);
3242
3243        let file_data = {
3244            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3245            writer.write(&sliced_batch).unwrap();
3246            writer.into_inner().unwrap().into_inner().unwrap()
3247        };
3248        let read_batch = {
3249            let projection = None;
3250            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3251            reader
3252                .next()
3253                .expect("expect no errors reading batch")
3254                .expect("expect batch")
3255        };
3256        assert_eq!(sliced_batch, read_batch);
3257
3258        // TODO test file writer/reader
3259    }
3260
3261    #[test]
3262    fn encode_bools_slice() {
3263        // Test case for https://github.com/apache/arrow-rs/issues/3496
3264        assert_bool_roundtrip([true, false], 1, 1);
3265
3266        // slice somewhere in the middle
3267        assert_bool_roundtrip(
3268            [
3269                true, false, true, true, false, false, true, true, true, false, false, false, true,
3270                true, true, true, false, false, false, false, true, true, true, true, true, false,
3271                false, false, false, false,
3272            ],
3273            13,
3274            17,
3275        );
3276
3277        // start at byte boundary, end in the middle
3278        assert_bool_roundtrip(
3279            [
3280                true, false, true, true, false, false, true, true, true, false, false, false,
3281            ],
3282            8,
3283            2,
3284        );
3285
3286        // start and stop and byte boundary
3287        assert_bool_roundtrip(
3288            [
3289                true, false, true, true, false, false, true, true, true, false, false, false, true,
3290                true, true, true, true, false, false, false, false, false,
3291            ],
3292            8,
3293            8,
3294        );
3295    }
3296
3297    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3298        let val_bool_field = Field::new("val", DataType::Boolean, false);
3299
3300        let schema = Arc::new(Schema::new(vec![val_bool_field]));
3301
3302        let bools = BooleanArray::from(bools.to_vec());
3303
3304        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3305        let batch = batch.slice(offset, length);
3306
3307        let data = serialize_stream(&batch);
3308        let batch2 = deserialize_stream(data);
3309        assert_eq!(batch, batch2);
3310    }
3311
3312    #[test]
3313    fn test_run_array_unslice() {
3314        let total_len = 80;
3315        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3316        let repeats: Vec<usize> = vec![3, 4, 1, 2];
3317        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3318        for ix in 0_usize..32 {
3319            let repeat: usize = repeats[ix % repeats.len()];
3320            let val: Option<i32> = vals[ix % vals.len()];
3321            input_array.resize(input_array.len() + repeat, val);
3322        }
3323
3324        // Encode the input_array to run array
3325        let mut builder =
3326            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3327        builder.extend(input_array.iter().copied());
3328        let run_array = builder.finish();
3329
3330        // test for all slice lengths.
3331        for slice_len in 1..=total_len {
3332            // test for offset = 0, slice length = slice_len
3333            let sliced_run_array: RunArray<Int16Type> =
3334                run_array.slice(0, slice_len).into_data().into();
3335
3336            // Create unsliced run array.
3337            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3338            let typed = unsliced_run_array
3339                .downcast::<PrimitiveArray<Int32Type>>()
3340                .unwrap();
3341            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3342            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3343            assert_eq!(expected, actual);
3344
3345            // test for offset = total_len - slice_len, length = slice_len
3346            let sliced_run_array: RunArray<Int16Type> = run_array
3347                .slice(total_len - slice_len, slice_len)
3348                .into_data()
3349                .into();
3350
3351            // Create unsliced run array.
3352            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3353            let typed = unsliced_run_array
3354                .downcast::<PrimitiveArray<Int32Type>>()
3355                .unwrap();
3356            let expected: Vec<Option<i32>> = input_array
3357                .iter()
3358                .skip(total_len - slice_len)
3359                .copied()
3360                .collect();
3361            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3362            assert_eq!(expected, actual);
3363        }
3364    }
3365
3366    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3367        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3368
3369        for i in 0..100_000 {
3370            for value in [i, i, i] {
3371                ls.values().append_value(value);
3372            }
3373            ls.append(true)
3374        }
3375
3376        ls.finish()
3377    }
3378
3379    fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3380        let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3381
3382        for i in 0..100_000 {
3383            for value in [
3384                format!("value{}", i),
3385                format!("value{}", i),
3386                format!("value{}", i),
3387            ] {
3388                ls.values().append_value(&value);
3389            }
3390            ls.append(true)
3391        }
3392
3393        ls.finish()
3394    }
3395
3396    fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3397        let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3398
3399        for i in 0..100_000 {
3400            for value in [
3401                format!("value{}", i),
3402                format!("value{}", i),
3403                format!("value{}", i),
3404            ] {
3405                ls.values().append_value(&value);
3406            }
3407            ls.append(true)
3408        }
3409
3410        ls.finish()
3411    }
3412
3413    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3414        let mut ls =
3415            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3416
3417        for _i in 0..10_000 {
3418            for j in 0..10 {
3419                for value in [j, j, j, j] {
3420                    ls.values().values().append_value(value);
3421                }
3422                ls.values().append(true)
3423            }
3424            ls.append(true);
3425        }
3426
3427        ls.finish()
3428    }
3429
3430    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3431        let mut ls =
3432            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3433
3434        for _i in 0..999 {
3435            ls.values().append(true);
3436            ls.append(true);
3437        }
3438
3439        for j in 0..10 {
3440            for value in [j, j, j, j] {
3441                ls.values().values().append_value(value);
3442            }
3443            ls.values().append(true)
3444        }
3445        ls.append(true);
3446
3447        for i in 0..9_000 {
3448            for j in 0..10 {
3449                for value in [i + j, i + j, i + j, i + j] {
3450                    ls.values().values().append_value(value);
3451                }
3452                ls.values().append(true)
3453            }
3454            ls.append(true);
3455        }
3456
3457        ls.finish()
3458    }
3459
3460    fn generate_map_array_data() -> MapArray {
3461        let keys_builder = UInt32Builder::new();
3462        let values_builder = UInt32Builder::new();
3463
3464        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3465
3466        for i in 0..100_000 {
3467            for _j in 0..3 {
3468                builder.keys().append_value(i);
3469                builder.values().append_value(i * 2);
3470            }
3471            builder.append(true).unwrap();
3472        }
3473
3474        builder.finish()
3475    }
3476
3477    #[test]
3478    fn reencode_offsets_when_first_offset_is_not_zero() {
3479        let original_list = generate_list_data::<i32>();
3480        let original_data = original_list.into_data();
3481        let slice_data = original_data.slice(75, 7);
3482        let (new_offsets, original_start, length) =
3483            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3484        assert_eq!(
3485            vec![0, 3, 6, 9, 12, 15, 18, 21],
3486            new_offsets.typed_data::<i32>()
3487        );
3488        assert_eq!(225, original_start);
3489        assert_eq!(21, length);
3490    }
3491
3492    #[test]
3493    fn reencode_offsets_when_first_offset_is_zero() {
3494        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3495        // ls = [[], [35, 42]
3496        ls.append(true);
3497        ls.values().append_value(35);
3498        ls.values().append_value(42);
3499        ls.append(true);
3500        let original_list = ls.finish();
3501        let original_data = original_list.into_data();
3502
3503        let slice_data = original_data.slice(1, 1);
3504        let (new_offsets, original_start, length) =
3505            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3506        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3507        assert_eq!(0, original_start);
3508        assert_eq!(2, length);
3509    }
3510
3511    /// Ensure when serde full & sliced versions they are equal to original input.
3512    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3513    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3514        // test both full and sliced versions
3515        let in_sliced = in_batch.slice(999, 1);
3516
3517        let bytes_batch = serialize_file(&in_batch);
3518        let bytes_sliced = serialize_file(&in_sliced);
3519
3520        // serializing 1 row should be significantly smaller than serializing 100,000
3521        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3522
3523        // ensure both are still valid and equal to originals
3524        let out_batch = deserialize_file(bytes_batch);
3525        assert_eq!(in_batch, out_batch);
3526
3527        let out_sliced = deserialize_file(bytes_sliced);
3528        assert_eq!(in_sliced, out_sliced);
3529    }
3530
3531    #[test]
3532    fn encode_lists() {
3533        let val_inner = Field::new_list_field(DataType::UInt32, true);
3534        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3535        let schema = Arc::new(Schema::new(vec![val_list_field]));
3536
3537        let values = Arc::new(generate_list_data::<i32>());
3538
3539        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3540        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3541    }
3542
3543    #[test]
3544    fn encode_empty_list() {
3545        let val_inner = Field::new_list_field(DataType::UInt32, true);
3546        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3547        let schema = Arc::new(Schema::new(vec![val_list_field]));
3548
3549        let values = Arc::new(generate_list_data::<i32>());
3550
3551        let in_batch = RecordBatch::try_new(schema, vec![values])
3552            .unwrap()
3553            .slice(999, 0);
3554        let out_batch = deserialize_file(serialize_file(&in_batch));
3555        assert_eq!(in_batch, out_batch);
3556    }
3557
3558    #[test]
3559    fn encode_large_lists() {
3560        let val_inner = Field::new_list_field(DataType::UInt32, true);
3561        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3562        let schema = Arc::new(Schema::new(vec![val_list_field]));
3563
3564        let values = Arc::new(generate_list_data::<i64>());
3565
3566        // ensure when serde full & sliced versions they are equal to original input
3567        // also ensure serialized sliced version is significantly smaller than serialized full
3568        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3569        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3570    }
3571
3572    #[test]
3573    fn encode_large_lists_non_zero_offset() {
3574        let val_inner = Field::new_list_field(DataType::UInt32, true);
3575        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3576        let schema = Arc::new(Schema::new(vec![val_list_field]));
3577
3578        let values = Arc::new(generate_list_data::<i64>());
3579
3580        check_sliced_list_array(schema, values);
3581    }
3582
3583    #[test]
3584    fn encode_large_lists_string_non_zero_offset() {
3585        let val_inner = Field::new_list_field(DataType::Utf8, true);
3586        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3587        let schema = Arc::new(Schema::new(vec![val_list_field]));
3588
3589        let values = Arc::new(generate_string_list_data::<i64>());
3590
3591        check_sliced_list_array(schema, values);
3592    }
3593
3594    #[test]
3595    fn encode_large_list_string_view_non_zero_offset() {
3596        let val_inner = Field::new_list_field(DataType::Utf8View, true);
3597        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3598        let schema = Arc::new(Schema::new(vec![val_list_field]));
3599
3600        let values = Arc::new(generate_utf8view_list_data::<i64>());
3601
3602        check_sliced_list_array(schema, values);
3603    }
3604
3605    fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3606        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3607            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3608                .unwrap()
3609                .slice(offset, len);
3610            let out_batch = deserialize_file(serialize_file(&in_batch));
3611            assert_eq!(in_batch, out_batch);
3612        }
3613    }
3614
3615    #[test]
3616    fn encode_nested_lists() {
3617        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3618        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3619        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3620        let schema = Arc::new(Schema::new(vec![list_field]));
3621
3622        let values = Arc::new(generate_nested_list_data::<i32>());
3623
3624        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3625        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3626    }
3627
3628    #[test]
3629    fn encode_nested_lists_starting_at_zero() {
3630        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3631        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3632        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3633        let schema = Arc::new(Schema::new(vec![list_field]));
3634
3635        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3636
3637        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3638        roundtrip_ensure_sliced_smaller(in_batch, 1);
3639    }
3640
3641    #[test]
3642    fn encode_map_array() {
3643        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3644        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3645        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3646        let schema = Arc::new(Schema::new(vec![map_field]));
3647
3648        let values = Arc::new(generate_map_array_data());
3649
3650        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3651        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3652    }
3653
3654    fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3655        let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3656
3657        for i in 0u32..100_000 {
3658            if i.is_multiple_of(10_000) {
3659                builder.append(false);
3660                continue;
3661            }
3662            for value in [i, i, i] {
3663                builder.values().append_value(value);
3664            }
3665            builder.append(true);
3666        }
3667
3668        builder.finish()
3669    }
3670
3671    #[test]
3672    fn encode_list_view_arrays() {
3673        let val_inner = Field::new_list_field(DataType::UInt32, true);
3674        let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3675        let schema = Arc::new(Schema::new(vec![val_field]));
3676
3677        let values = Arc::new(generate_list_view_data::<i32>());
3678
3679        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3680        let out_batch = deserialize_file(serialize_file(&in_batch));
3681        assert_eq!(in_batch, out_batch);
3682    }
3683
3684    #[test]
3685    fn encode_large_list_view_arrays() {
3686        let val_inner = Field::new_list_field(DataType::UInt32, true);
3687        let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3688        let schema = Arc::new(Schema::new(vec![val_field]));
3689
3690        let values = Arc::new(generate_list_view_data::<i64>());
3691
3692        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3693        let out_batch = deserialize_file(serialize_file(&in_batch));
3694        assert_eq!(in_batch, out_batch);
3695    }
3696
3697    #[test]
3698    fn check_sliced_list_view_array() {
3699        let inner = Field::new_list_field(DataType::UInt32, true);
3700        let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3701        let schema = Arc::new(Schema::new(vec![field]));
3702        let values = Arc::new(generate_list_view_data::<i32>());
3703
3704        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3705            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3706                .unwrap()
3707                .slice(offset, len);
3708            let out_batch = deserialize_file(serialize_file(&in_batch));
3709            assert_eq!(in_batch, out_batch);
3710        }
3711    }
3712
3713    #[test]
3714    fn check_sliced_large_list_view_array() {
3715        let inner = Field::new_list_field(DataType::UInt32, true);
3716        let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3717        let schema = Arc::new(Schema::new(vec![field]));
3718        let values = Arc::new(generate_list_view_data::<i64>());
3719
3720        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3721            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3722                .unwrap()
3723                .slice(offset, len);
3724            let out_batch = deserialize_file(serialize_file(&in_batch));
3725            assert_eq!(in_batch, out_batch);
3726        }
3727    }
3728
3729    fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3730        let inner_builder = UInt32Builder::new();
3731        let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3732        let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3733
3734        for i in 0u32..10_000 {
3735            if i.is_multiple_of(1_000) {
3736                outer_builder.append(false);
3737                continue;
3738            }
3739
3740            for _ in 0..3 {
3741                for value in [i, i + 1, i + 2] {
3742                    outer_builder.values().values().append_value(value);
3743                }
3744                outer_builder.values().append(true);
3745            }
3746            outer_builder.append(true);
3747        }
3748
3749        outer_builder.finish()
3750    }
3751
3752    #[test]
3753    fn encode_nested_list_views() {
3754        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3755        let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3756        let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3757        let schema = Arc::new(Schema::new(vec![list_field]));
3758
3759        let values = Arc::new(generate_nested_list_view_data::<i32>());
3760
3761        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3762        let out_batch = deserialize_file(serialize_file(&in_batch));
3763        assert_eq!(in_batch, out_batch);
3764    }
3765
3766    fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3767        list_data_type: DataType,
3768        offsets: &[U; 5],
3769        sizes: &[U; 4],
3770    ) {
3771        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3772        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3773        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3774        let dict_data = dict_array.to_data();
3775
3776        let value_offsets = Buffer::from_slice_ref(offsets);
3777        let value_sizes = Buffer::from_slice_ref(sizes);
3778
3779        let list_data = ArrayData::builder(list_data_type)
3780            .len(4)
3781            .add_buffer(value_offsets)
3782            .add_buffer(value_sizes)
3783            .add_child_data(dict_data)
3784            .build()
3785            .unwrap();
3786        let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3787
3788        let schema = Arc::new(Schema::new(vec![Field::new(
3789            "f1",
3790            list_view_array.data_type().clone(),
3791            false,
3792        )]));
3793        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3794
3795        let output_batch = deserialize_file(serialize_file(&input_batch));
3796        assert_eq!(input_batch, output_batch);
3797
3798        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3799        assert_eq!(input_batch, output_batch);
3800    }
3801
3802    #[test]
3803    fn test_roundtrip_list_view_of_dict() {
3804        #[allow(deprecated)]
3805        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3806            "item",
3807            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3808            true,
3809            1,
3810            false,
3811        )));
3812        let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3813        let sizes: &[i32; 4] = &[2, 2, 0, 3];
3814        test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3815    }
3816
3817    #[test]
3818    fn test_roundtrip_large_list_view_of_dict() {
3819        #[allow(deprecated)]
3820        let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3821            "item",
3822            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3823            true,
3824            2,
3825            false,
3826        )));
3827        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3828        let sizes: &[i64; 4] = &[2, 2, 0, 3];
3829        test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3830    }
3831
3832    #[test]
3833    fn test_roundtrip_sliced_list_view_of_dict() {
3834        #[allow(deprecated)]
3835        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3836            "item",
3837            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3838            true,
3839            3,
3840            false,
3841        )));
3842
3843        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3844        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3845        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3846        let dict_data = dict_array.to_data();
3847
3848        let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3849        let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3850        let value_offsets = Buffer::from_slice_ref(offsets);
3851        let value_sizes = Buffer::from_slice_ref(sizes);
3852
3853        let list_data = ArrayData::builder(list_data_type)
3854            .len(6)
3855            .add_buffer(value_offsets)
3856            .add_buffer(value_sizes)
3857            .add_child_data(dict_data)
3858            .build()
3859            .unwrap();
3860        let list_view_array = GenericListViewArray::<i32>::from(list_data);
3861
3862        let schema = Arc::new(Schema::new(vec![Field::new(
3863            "f1",
3864            list_view_array.data_type().clone(),
3865            false,
3866        )]));
3867        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3868
3869        let sliced_batch = input_batch.slice(1, 4);
3870
3871        let output_batch = deserialize_file(serialize_file(&sliced_batch));
3872        assert_eq!(sliced_batch, output_batch);
3873
3874        let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3875        assert_eq!(sliced_batch, output_batch);
3876    }
3877
3878    #[test]
3879    fn test_roundtrip_dense_union_of_dict() {
3880        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3881        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3882        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3883
3884        #[allow(deprecated)]
3885        let dict_field = Arc::new(Field::new_dict(
3886            "dict",
3887            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3888            true,
3889            1,
3890            false,
3891        ));
3892        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3893        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3894
3895        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3896        let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3897
3898        let int_array = Int32Array::from(vec![100, 200]);
3899
3900        let union = UnionArray::try_new(
3901            union_fields.clone(),
3902            types,
3903            Some(offsets),
3904            vec![Arc::new(dict_array), Arc::new(int_array)],
3905        )
3906        .unwrap();
3907
3908        let schema = Arc::new(Schema::new(vec![Field::new(
3909            "union",
3910            DataType::Union(union_fields, UnionMode::Dense),
3911            false,
3912        )]));
3913        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3914
3915        let output_batch = deserialize_file(serialize_file(&input_batch));
3916        assert_eq!(input_batch, output_batch);
3917
3918        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3919        assert_eq!(input_batch, output_batch);
3920    }
3921
3922    #[test]
3923    fn test_roundtrip_sparse_union_of_dict() {
3924        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3925        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3926        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3927
3928        #[allow(deprecated)]
3929        let dict_field = Arc::new(Field::new_dict(
3930            "dict",
3931            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3932            true,
3933            2,
3934            false,
3935        ));
3936        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3937        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3938
3939        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3940
3941        let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3942
3943        let union = UnionArray::try_new(
3944            union_fields.clone(),
3945            types,
3946            None,
3947            vec![Arc::new(dict_array), Arc::new(int_array)],
3948        )
3949        .unwrap();
3950
3951        let schema = Arc::new(Schema::new(vec![Field::new(
3952            "union",
3953            DataType::Union(union_fields, UnionMode::Sparse),
3954            false,
3955        )]));
3956        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3957
3958        let output_batch = deserialize_file(serialize_file(&input_batch));
3959        assert_eq!(input_batch, output_batch);
3960
3961        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3962        assert_eq!(input_batch, output_batch);
3963    }
3964
3965    #[test]
3966    fn test_roundtrip_map_with_dict_keys() {
3967        // Building a map array is a bit involved. We first build a struct arary that has a key and
3968        // value field and then use that to build the actual map array.
3969        let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3970        let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3971        let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3972
3973        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3974
3975        #[allow(deprecated)]
3976        let entries_field = Arc::new(Field::new(
3977            "entries",
3978            DataType::Struct(
3979                vec![
3980                    Field::new_dict(
3981                        "key",
3982                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3983                        false,
3984                        1,
3985                        false,
3986                    ),
3987                    Field::new("value", DataType::Int32, true),
3988                ]
3989                .into(),
3990            ),
3991            false,
3992        ));
3993
3994        let entries = StructArray::from(vec![
3995            (
3996                Arc::new(Field::new(
3997                    "key",
3998                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3999                    false,
4000                )),
4001                Arc::new(dict_keys) as ArrayRef,
4002            ),
4003            (
4004                Arc::new(Field::new("value", DataType::Int32, true)),
4005                Arc::new(values) as ArrayRef,
4006            ),
4007        ]);
4008
4009        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4010
4011        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4012            .len(3)
4013            .add_buffer(offsets)
4014            .add_child_data(entries.into_data())
4015            .build()
4016            .unwrap();
4017        let map_array = MapArray::from(map_data);
4018
4019        let schema = Arc::new(Schema::new(vec![Field::new(
4020            "map",
4021            map_array.data_type().clone(),
4022            false,
4023        )]));
4024        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4025
4026        let output_batch = deserialize_file(serialize_file(&input_batch));
4027        assert_eq!(input_batch, output_batch);
4028
4029        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4030        assert_eq!(input_batch, output_batch);
4031    }
4032
4033    #[test]
4034    fn test_roundtrip_map_with_dict_values() {
4035        // Building a map array is a bit involved. We first build a struct arary that has a key and
4036        // value field and then use that to build the actual map array.
4037        let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
4038
4039        let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
4040        let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4041        let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
4042
4043        #[allow(deprecated)]
4044        let entries_field = Arc::new(Field::new(
4045            "entries",
4046            DataType::Struct(
4047                vec![
4048                    Field::new("key", DataType::Utf8, false),
4049                    Field::new_dict(
4050                        "value",
4051                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4052                        true,
4053                        2,
4054                        false,
4055                    ),
4056                ]
4057                .into(),
4058            ),
4059            false,
4060        ));
4061
4062        let entries = StructArray::from(vec![
4063            (
4064                Arc::new(Field::new("key", DataType::Utf8, false)),
4065                Arc::new(keys) as ArrayRef,
4066            ),
4067            (
4068                Arc::new(Field::new(
4069                    "value",
4070                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4071                    true,
4072                )),
4073                Arc::new(dict_values) as ArrayRef,
4074            ),
4075        ]);
4076
4077        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4078
4079        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4080            .len(3)
4081            .add_buffer(offsets)
4082            .add_child_data(entries.into_data())
4083            .build()
4084            .unwrap();
4085        let map_array = MapArray::from(map_data);
4086
4087        let schema = Arc::new(Schema::new(vec![Field::new(
4088            "map",
4089            map_array.data_type().clone(),
4090            false,
4091        )]));
4092        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4093
4094        let output_batch = deserialize_file(serialize_file(&input_batch));
4095        assert_eq!(input_batch, output_batch);
4096
4097        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4098        assert_eq!(input_batch, output_batch);
4099    }
4100
4101    #[test]
4102    fn test_decimal128_alignment16_is_sufficient() {
4103        const IPC_ALIGNMENT: usize = 16;
4104
4105        // Test a bunch of different dimensions to ensure alignment is never an issue.
4106        // For example, if we only test `num_cols = 1` then even with alignment 8 this
4107        // test would _happen_ to pass, even though for different dimensions like
4108        // `num_cols = 2` it would fail.
4109        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4110            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
4111
4112            let mut fields = Vec::new();
4113            let mut arrays = Vec::new();
4114            for i in 0..num_cols {
4115                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4116                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4117                fields.push(field);
4118                arrays.push(Arc::new(array) as Arc<dyn Array>);
4119            }
4120            let schema = Schema::new(fields);
4121            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4122
4123            let mut writer = FileWriter::try_new_with_options(
4124                Vec::new(),
4125                batch.schema_ref(),
4126                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4127            )
4128            .unwrap();
4129            writer.write(&batch).unwrap();
4130            writer.finish().unwrap();
4131
4132            let out: Vec<u8> = writer.into_inner().unwrap();
4133
4134            let buffer = Buffer::from_vec(out);
4135            let trailer_start = buffer.len() - 10;
4136            let footer_len =
4137                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4138            let footer =
4139                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4140
4141            let schema = fb_to_schema(footer.schema().unwrap());
4142
4143            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
4144            // for `read_record_batch` later on to read the data in a zero-copy manner.
4145            let decoder =
4146                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4147
4148            let batches = footer.recordBatches().unwrap();
4149
4150            let block = batches.get(0);
4151            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4152            let data = buffer.slice_with_length(block.offset() as _, block_len);
4153
4154            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4155
4156            assert_eq!(batch, batch2);
4157        }
4158    }
4159
4160    #[test]
4161    fn test_decimal128_alignment8_is_unaligned() {
4162        const IPC_ALIGNMENT: usize = 8;
4163
4164        let num_cols = 2;
4165        let num_rows = 1;
4166
4167        let mut fields = Vec::new();
4168        let mut arrays = Vec::new();
4169        for i in 0..num_cols {
4170            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4171            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4172            fields.push(field);
4173            arrays.push(Arc::new(array) as Arc<dyn Array>);
4174        }
4175        let schema = Schema::new(fields);
4176        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4177
4178        let mut writer = FileWriter::try_new_with_options(
4179            Vec::new(),
4180            batch.schema_ref(),
4181            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4182        )
4183        .unwrap();
4184        writer.write(&batch).unwrap();
4185        writer.finish().unwrap();
4186
4187        let out: Vec<u8> = writer.into_inner().unwrap();
4188
4189        let buffer = Buffer::from_vec(out);
4190        let trailer_start = buffer.len() - 10;
4191        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4192        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4193        let schema = fb_to_schema(footer.schema().unwrap());
4194
4195        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
4196        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
4197        let decoder =
4198            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4199
4200        let batches = footer.recordBatches().unwrap();
4201
4202        let block = batches.get(0);
4203        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4204        let data = buffer.slice_with_length(block.offset() as _, block_len);
4205
4206        let result = decoder.read_record_batch(block, &data);
4207
4208        let error = result.unwrap_err();
4209        assert_eq!(
4210            error.to_string(),
4211            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4212             offset from expected alignment of 16 by 8"
4213        );
4214    }
4215
4216    #[test]
4217    fn test_flush() {
4218        // We write a schema which is small enough to fit into a buffer and not get flushed,
4219        // and then force the write with .flush().
4220        let num_cols = 2;
4221        let mut fields = Vec::new();
4222        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4223        for i in 0..num_cols {
4224            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4225            fields.push(field);
4226        }
4227        let schema = Schema::new(fields);
4228        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4229        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4230        let mut stream_writer =
4231            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4232                .unwrap();
4233        let mut file_writer =
4234            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4235
4236        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4237        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4238        stream_writer.flush().unwrap();
4239        file_writer.flush().unwrap();
4240        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4241        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4242        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4243        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
4244        // and then a length of 0 (4 bytes) for a total of 8 bytes.
4245        // Everything before that should have been flushed in the .flush() call.
4246        let expected_stream_flushed_bytes = stream_out.len() - 8;
4247        // A file write is the same as the stream write except for the leading magic string
4248        // ARROW1 plus padding, which is 8 bytes.
4249        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4250
4251        assert!(
4252            stream_bytes_written_on_new < stream_bytes_written_on_flush,
4253            "this test makes no sense if flush is not actually required"
4254        );
4255        assert!(
4256            file_bytes_written_on_new < file_bytes_written_on_flush,
4257            "this test makes no sense if flush is not actually required"
4258        );
4259        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4260        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4261    }
4262
4263    #[test]
4264    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4265        let l1_type =
4266            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4267        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4268
4269        let l0_builder = Float32Builder::new();
4270        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4271            "item",
4272            DataType::Float32,
4273            false,
4274        )));
4275        let mut l2_builder =
4276            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4277
4278        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4279            l2_builder.values().values().append_value(point[0]);
4280            l2_builder.values().values().append_value(point[1]);
4281            l2_builder.values().values().append_value(point[2]);
4282
4283            l2_builder.values().append(true);
4284        }
4285        l2_builder.append(true);
4286
4287        let point = [10., 11., 12.];
4288        l2_builder.values().values().append_value(point[0]);
4289        l2_builder.values().values().append_value(point[1]);
4290        l2_builder.values().values().append_value(point[2]);
4291
4292        l2_builder.values().append(true);
4293        l2_builder.append(true);
4294
4295        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4296
4297        let schema = Arc::new(Schema::new_with_metadata(
4298            vec![Field::new("points", l2_type, false)],
4299            HashMap::default(),
4300        ));
4301
4302        // Test a variety of combinations that include 0 and non-zero offsets
4303        // and also portions or the rest of the array
4304        test_slices(&array, &schema, 0, 1)?;
4305        test_slices(&array, &schema, 0, 2)?;
4306        test_slices(&array, &schema, 1, 1)?;
4307
4308        Ok(())
4309    }
4310
4311    #[test]
4312    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4313        let l0_builder = Float32Builder::new();
4314        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4315        let mut l2_builder = ListBuilder::new(l1_builder);
4316
4317        for point in [
4318            [Some(1.0), Some(2.0), None],
4319            [Some(4.0), Some(5.0), Some(6.0)],
4320            [None, Some(8.0), Some(9.0)],
4321        ] {
4322            for p in point {
4323                match p {
4324                    Some(p) => l2_builder.values().values().append_value(p),
4325                    None => l2_builder.values().values().append_null(),
4326                }
4327            }
4328
4329            l2_builder.values().append(true);
4330        }
4331        l2_builder.append(true);
4332
4333        let point = [Some(10.), None, None];
4334        for p in point {
4335            match p {
4336                Some(p) => l2_builder.values().values().append_value(p),
4337                None => l2_builder.values().values().append_null(),
4338            }
4339        }
4340
4341        l2_builder.values().append(true);
4342        l2_builder.append(true);
4343
4344        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4345
4346        let schema = Arc::new(Schema::new_with_metadata(
4347            vec![Field::new(
4348                "points",
4349                DataType::List(Arc::new(Field::new(
4350                    "item",
4351                    DataType::FixedSizeList(
4352                        Arc::new(Field::new("item", DataType::Float32, true)),
4353                        3,
4354                    ),
4355                    true,
4356                ))),
4357                true,
4358            )],
4359            HashMap::default(),
4360        ));
4361
4362        // Test a variety of combinations that include 0 and non-zero offsets
4363        // and also portions or the rest of the array
4364        test_slices(&array, &schema, 0, 1)?;
4365        test_slices(&array, &schema, 0, 2)?;
4366        test_slices(&array, &schema, 1, 1)?;
4367
4368        Ok(())
4369    }
4370
4371    fn test_slices(
4372        parent_array: &ArrayRef,
4373        schema: &SchemaRef,
4374        offset: usize,
4375        length: usize,
4376    ) -> Result<(), ArrowError> {
4377        let subarray = parent_array.slice(offset, length);
4378        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4379
4380        let mut bytes = Vec::new();
4381        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4382        writer.write(&original_batch)?;
4383        writer.finish()?;
4384
4385        let mut cursor = std::io::Cursor::new(bytes);
4386        let mut reader = StreamReader::try_new(&mut cursor, None)?;
4387        let returned_batch = reader.next().unwrap()?;
4388
4389        assert_eq!(original_batch, returned_batch);
4390
4391        Ok(())
4392    }
4393
4394    #[test]
4395    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4396        let int_builder = Int64Builder::new();
4397        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4398            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4399
4400        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4401            fixed_list_builder.values().append_value(point[0]);
4402            fixed_list_builder.values().append_value(point[1]);
4403            fixed_list_builder.values().append_value(point[2]);
4404
4405            fixed_list_builder.append(true);
4406        }
4407
4408        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4409
4410        let schema = Arc::new(Schema::new_with_metadata(
4411            vec![Field::new(
4412                "points",
4413                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4414                false,
4415            )],
4416            HashMap::default(),
4417        ));
4418
4419        // Test a variety of combinations that include 0 and non-zero offsets
4420        // and also portions or the rest of the array
4421        test_slices(&array, &schema, 0, 4)?;
4422        test_slices(&array, &schema, 0, 2)?;
4423        test_slices(&array, &schema, 1, 3)?;
4424        test_slices(&array, &schema, 2, 1)?;
4425
4426        Ok(())
4427    }
4428
4429    #[test]
4430    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4431        let int_builder = Int64Builder::new();
4432        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4433
4434        for point in [
4435            [Some(1), Some(2), None],
4436            [Some(4), Some(5), Some(6)],
4437            [None, Some(8), Some(9)],
4438            [Some(10), None, None],
4439        ] {
4440            for p in point {
4441                match p {
4442                    Some(p) => fixed_list_builder.values().append_value(p),
4443                    None => fixed_list_builder.values().append_null(),
4444                }
4445            }
4446
4447            fixed_list_builder.append(true);
4448        }
4449
4450        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4451
4452        let schema = Arc::new(Schema::new_with_metadata(
4453            vec![Field::new(
4454                "points",
4455                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4456                true,
4457            )],
4458            HashMap::default(),
4459        ));
4460
4461        // Test a variety of combinations that include 0 and non-zero offsets
4462        // and also portions or the rest of the array
4463        test_slices(&array, &schema, 0, 4)?;
4464        test_slices(&array, &schema, 0, 2)?;
4465        test_slices(&array, &schema, 1, 3)?;
4466        test_slices(&array, &schema, 2, 1)?;
4467
4468        Ok(())
4469    }
4470
4471    #[test]
4472    fn test_metadata_encoding_ordering() {
4473        fn create_hash() -> u64 {
4474            let metadata: HashMap<String, String> = [
4475                ("a", "1"), //
4476                ("b", "2"), //
4477                ("c", "3"), //
4478                ("d", "4"), //
4479                ("e", "5"), //
4480            ]
4481            .into_iter()
4482            .map(|(k, v)| (k.to_owned(), v.to_owned()))
4483            .collect();
4484
4485            // Set metadata on both the schema and a field within it.
4486            let schema = Arc::new(
4487                Schema::new(vec![
4488                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4489                ])
4490                .with_metadata(metadata)
4491                .clone(),
4492            );
4493            let batch = RecordBatch::new_empty(schema.clone());
4494
4495            let mut bytes = Vec::new();
4496            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4497            w.write(&batch).unwrap();
4498            w.finish().unwrap();
4499
4500            let mut h = std::hash::DefaultHasher::new();
4501            h.write(&bytes);
4502            h.finish()
4503        }
4504
4505        let expected = create_hash();
4506
4507        // Since there is randomness in the HashMap and we cannot specify our
4508        // own Hasher for the implementation used for metadata, run the above
4509        // code 20x and verify it does not change. This is not perfect but it
4510        // should be good enough.
4511        let all_passed = (0..20).all(|_| create_hash() == expected);
4512        assert!(all_passed);
4513    }
4514
4515    #[test]
4516    fn test_dictionary_tracker_reset() {
4517        let data_gen = IpcDataGenerator::default();
4518        let mut dictionary_tracker = DictionaryTracker::new(false);
4519        let writer_options = IpcWriteOptions::default();
4520        let mut compression_ctx = IpcWriteContext::default();
4521
4522        let schema = Arc::new(Schema::new(vec![Field::new(
4523            "a",
4524            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4525            false,
4526        )]));
4527
4528        let mut write_single_batch_stream =
4529            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4530                let mut buffer = Vec::new();
4531
4532                // create a new IPC stream:
4533                let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4534                    &schema,
4535                    dict_tracker,
4536                    &writer_options,
4537                );
4538                _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4539
4540                let (encoded_dicts, encoded_batch) = data_gen
4541                    .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4542                    .unwrap();
4543                for encoded_dict in encoded_dicts {
4544                    _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4545                }
4546                _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4547
4548                buffer
4549            };
4550
4551        let batch1 = RecordBatch::try_new(
4552            schema.clone(),
4553            vec![Arc::new(DictionaryArray::new(
4554                UInt8Array::from_iter_values([0]),
4555                Arc::new(StringArray::from_iter_values(["a"])),
4556            ))],
4557        )
4558        .unwrap();
4559        let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4560
4561        // ensure we can read the stream back
4562        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4563        let read_batch = reader.next().unwrap().unwrap();
4564        assert_eq!(read_batch, batch1);
4565
4566        // reset the dictionary tracker so it can be used for next stream
4567        dictionary_tracker.clear();
4568
4569        // now write a 2nd stream and ensure we can also read it:
4570        let batch2 = RecordBatch::try_new(
4571            schema.clone(),
4572            vec![Arc::new(DictionaryArray::new(
4573                UInt8Array::from_iter_values([0]),
4574                Arc::new(StringArray::from_iter_values(["a"])),
4575            ))],
4576        )
4577        .unwrap();
4578        let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4579        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4580        let read_batch = reader.next().unwrap().unwrap();
4581        assert_eq!(read_batch, batch2);
4582    }
4583}