Skip to main content

arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46#[expect(deprecated)]
47pub use crate::compression::{CompressionContext, IpcWriteContext};
48use crate::convert::IpcSchemaEncoder;
49
50/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
51#[derive(Debug, Clone)]
52pub struct IpcWriteOptions {
53    /// Write padding after memory buffers to this multiple of bytes.
54    /// Must be 8, 16, 32, or 64 - defaults to 64.
55    alignment: u8,
56    /// The legacy format is for releases before 0.15.0, and uses metadata V4
57    write_legacy_ipc_format: bool,
58    /// The metadata version to write. The Rust IPC writer supports V4+
59    ///
60    /// *Default versions per crate*
61    ///
62    /// When creating the default IpcWriteOptions, the following metadata versions are used:
63    ///
64    /// version 2.0.0: V4, with legacy format enabled
65    /// version 4.0.0: V5
66    metadata_version: crate::MetadataVersion,
67    /// Compression, if desired. Will result in a runtime error
68    /// if the corresponding feature is not enabled
69    batch_compression_type: Option<crate::CompressionType>,
70    // Compression level
71    batch_compression_level: Option<i32>,
72    /// How to handle updating dictionaries in IPC messages
73    dictionary_handling: DictionaryHandling,
74}
75
76/// A single buffer segment ready to be written to the output stream.
77///
78/// For the uncompressed path the original Arc-backed [`Buffer`] is stored
79/// directly (zero copy). For the compressed path the compressed bytes are
80/// owned by a scratch `Vec<u8>`.
81enum EncodedBuffer {
82    /// Uncompressed: Arc-backed reference to the original array buffer.
83    Raw(Buffer),
84    /// Compressed: owned scratch bytes produced by the codec.
85    Compressed(Vec<u8>),
86}
87
88impl EncodedBuffer {
89    fn as_slice(&self) -> &[u8] {
90        match self {
91            EncodedBuffer::Raw(b) => b.as_slice(),
92            EncodedBuffer::Compressed(v) => v.as_slice(),
93        }
94    }
95
96    fn len(&self) -> usize {
97        match self {
98            EncodedBuffer::Raw(b) => b.len(),
99            EncodedBuffer::Compressed(v) => v.len(),
100        }
101    }
102}
103/// Accumulates the IPC metadata produced by [`write_array_data`].
104///
105/// `nodes` and `buffers` are serialised into the flatbuffer `RecordBatch` (or `DictionaryBatch`)
106/// header. The companion [`IpcBodySink`] holds the actual encoded bytes.
107#[derive(Default)]
108struct IpcMetadataBuilder {
109    nodes: Vec<crate::FieldNode>,
110    buffers: Vec<crate::Buffer>,
111}
112
113/// Destination for the raw Arrow data bytes (the IPC message body) produced by [`write_array_data`].
114///
115/// The companion [`IpcMetadataBuilder`] accumulates the flatbuffer metadata
116/// (offset + length of each buffer in the body); together they form a complete IPC message.
117enum IpcBodySink<'a> {
118    /// Serialize buffer bytes (with padding) into a contiguous byte vec.
119    Write(&'a mut Vec<u8>),
120    /// Accumulate pre-encoded buffer segments for deferred zero-copy streaming.
121    Collect(&'a mut Vec<EncodedBuffer>),
122}
123impl<'a> IpcBodySink<'a> {
124    /// Writes the encoded buffer to the sink.
125    pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
126        match self {
127            IpcBodySink::Write(vec) => {
128                vec.extend_from_slice(buffer.as_slice());
129                vec.extend_from_slice(&PADDING[..pad_len]);
130            }
131            IpcBodySink::Collect(vec) => {
132                vec.push(buffer);
133            }
134        }
135    }
136}
137
138/// Per-message sizes produced by [`IpcDataGenerator::write`].
139///
140/// [`FileWriter`] uses these to build the Block index entries required by the IPC footer for
141/// random-access reads.
142struct IpcWriteMetadata {
143    /// Per-dictionary `(padded_header_len, body_len)` for each dictionary batch written
144    /// before the record batch.
145    dictionary_block_sizes: Vec<(usize, usize)>,
146    /// Flatbuffer header size including continuation prefix and alignment padding.
147    padded_header_len: usize,
148    /// Total length of the record-batch body including trailing alignment padding.
149    body_len: usize,
150}
151
152impl IpcWriteOptions {
153    /// Configures compression when writing IPC files.
154    ///
155    /// Will result in a runtime error if the corresponding feature
156    /// is not enabled
157    pub fn try_with_compression(
158        mut self,
159        batch_compression_type: Option<crate::CompressionType>,
160    ) -> Result<Self, ArrowError> {
161        self.batch_compression_type = batch_compression_type;
162
163        if self.batch_compression_type.is_some()
164            && self.metadata_version < crate::MetadataVersion::V5
165        {
166            return Err(ArrowError::InvalidArgumentError(
167                "Compression only supported in metadata v5 and above".to_string(),
168            ));
169        }
170        Ok(self)
171    }
172
173    /// Configures the compression level used when writing compressed IPC batches.
174    ///
175    /// Compression levels require metadata V5 or newer and are currently only
176    /// supported for ZSTD compression.
177    pub fn try_with_compression_level(
178        mut self,
179        batch_compression_level: Option<i32>,
180    ) -> Result<Self, ArrowError> {
181        self.batch_compression_level = batch_compression_level;
182
183        if self.batch_compression_level.is_some()
184            && self.metadata_version < crate::MetadataVersion::V5
185        {
186            return Err(ArrowError::InvalidArgumentError(
187                "Compression only supported in metadata v5 and above".to_string(),
188            ));
189        }
190
191        match (self.batch_compression_type, self.batch_compression_level) {
192            (Some(crate::CompressionType::ZSTD), Some(level)) => {
193                return self.check_zstd_level(level);
194            }
195            (Some(crate::CompressionType::LZ4_FRAME), Some(_)) => {
196                return Err(ArrowError::InvalidArgumentError(
197                    "LZ4 Frame compression does not support configurable compression levels"
198                        .to_string(),
199                ));
200            }
201            _ => {}
202        }
203
204        Ok(self)
205    }
206
207    #[cfg(not(feature = "zstd"))]
208    fn check_zstd_level(self, _level: i32) -> Result<Self, ArrowError> {
209        Err(ArrowError::InvalidArgumentError(
210            "zstd IPC compression requires the zstd feature".to_string(),
211        ))
212    }
213
214    #[cfg(feature = "zstd")]
215    fn check_zstd_level(self, level: i32) -> Result<Self, ArrowError> {
216        let range = zstd::compression_level_range();
217        if !range.contains(&(level as zstd::zstd_safe::CompressionLevel)) {
218            return Err(ArrowError::InvalidArgumentError(format!(
219                "ZSTD compression level must be between {} and {}, got {}",
220                range.start(),
221                range.end(),
222                level,
223            )));
224        }
225
226        Ok(self)
227    }
228
229    /// Try to create IpcWriteOptions, checking for incompatible settings
230    pub fn try_new(
231        alignment: usize,
232        write_legacy_ipc_format: bool,
233        metadata_version: crate::MetadataVersion,
234    ) -> Result<Self, ArrowError> {
235        let is_alignment_valid =
236            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
237        if !is_alignment_valid {
238            return Err(ArrowError::InvalidArgumentError(
239                "Alignment should be 8, 16, 32, or 64.".to_string(),
240            ));
241        }
242        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
243        match metadata_version {
244            crate::MetadataVersion::V1
245            | crate::MetadataVersion::V2
246            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
247                "Writing IPC metadata version 3 and lower not supported".to_string(),
248            )),
249            #[allow(deprecated)]
250            crate::MetadataVersion::V4 => Ok(Self {
251                alignment,
252                write_legacy_ipc_format,
253                metadata_version,
254                batch_compression_type: None,
255                batch_compression_level: None,
256                dictionary_handling: DictionaryHandling::default(),
257            }),
258            crate::MetadataVersion::V5 => {
259                if write_legacy_ipc_format {
260                    Err(ArrowError::InvalidArgumentError(
261                        "Legacy IPC format only supported on metadata version 4".to_string(),
262                    ))
263                } else {
264                    Ok(Self {
265                        alignment,
266                        write_legacy_ipc_format,
267                        metadata_version,
268                        batch_compression_type: None,
269                        batch_compression_level: None,
270                        dictionary_handling: DictionaryHandling::default(),
271                    })
272                }
273            }
274            z => Err(ArrowError::InvalidArgumentError(format!(
275                "Unsupported crate::MetadataVersion {z:?}"
276            ))),
277        }
278    }
279
280    /// Configure how dictionaries are handled in IPC messages
281    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
282        self.dictionary_handling = dictionary_handling;
283        self
284    }
285}
286
287impl Default for IpcWriteOptions {
288    fn default() -> Self {
289        Self {
290            alignment: 64,
291            write_legacy_ipc_format: false,
292            metadata_version: crate::MetadataVersion::V5,
293            batch_compression_type: None,
294            batch_compression_level: None,
295            dictionary_handling: DictionaryHandling::default(),
296        }
297    }
298}
299
300#[derive(Debug, Default)]
301/// Handles low level details of encoding [`Array`] and [`Schema`] into the
302/// [Arrow IPC Format].
303///
304/// # Example
305/// ```
306/// # fn run() {
307/// # use std::sync::Arc;
308/// # use arrow_array::UInt64Array;
309/// # use arrow_array::RecordBatch;
310/// # use arrow_ipc::writer::{IpcWriteContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
311///
312/// // Create a record batch
313/// let batch = RecordBatch::try_from_iter(vec![
314///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
315/// ]).unwrap();
316///
317/// // Error of dictionary ids are replaced.
318/// let error_on_replacement = true;
319/// let options = IpcWriteOptions::default();
320/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
321///
322/// let mut ipc_write_context = IpcWriteContext::default();
323///
324/// // encode the batch into zero or more encoded dictionaries
325/// // and the data for the actual array.
326/// let data_gen = IpcDataGenerator::default();
327/// let (encoded_dictionaries, encoded_message) = data_gen
328///   .encode(&batch, &mut dictionary_tracker, &options, &mut ipc_write_context)
329///   .unwrap();
330/// # }
331/// ```
332///
333/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
334pub struct IpcDataGenerator {}
335
336impl IpcDataGenerator {
337    /// Converts a schema to an IPC message along with `dictionary_tracker`
338    /// and returns it encoded inside [EncodedData] as a flatbuffer.
339    pub fn schema_to_bytes_with_dictionary_tracker(
340        &self,
341        schema: &Schema,
342        dictionary_tracker: &mut DictionaryTracker,
343        write_options: &IpcWriteOptions,
344    ) -> EncodedData {
345        let mut fbb = FlatBufferBuilder::new();
346        let schema = {
347            let fb = IpcSchemaEncoder::new()
348                .with_dictionary_tracker(dictionary_tracker)
349                .schema_to_fb_offset(&mut fbb, schema);
350            fb.as_union_value()
351        };
352
353        let mut message = crate::MessageBuilder::new(&mut fbb);
354        message.add_version(write_options.metadata_version);
355        message.add_header_type(crate::MessageHeader::Schema);
356        message.add_bodyLength(0);
357        message.add_header(schema);
358        // TODO: custom metadata
359        let data = message.finish();
360        fbb.finish(data, None);
361
362        let data = fbb.finished_data();
363        EncodedData {
364            ipc_message: data.to_vec(),
365            arrow_data: vec![],
366        }
367    }
368
369    fn _encode_dictionaries<I: Iterator<Item = i64>>(
370        &self,
371        column: &ArrayRef,
372        encoded_dictionaries: &mut Vec<EncodedData>,
373        dictionary_tracker: &mut DictionaryTracker,
374        write_options: &IpcWriteOptions,
375        dict_id: &mut I,
376        ipc_write_context: &mut IpcWriteContext,
377    ) -> Result<(), ArrowError> {
378        match column.data_type() {
379            DataType::Struct(fields) => {
380                let s = as_struct_array(column);
381                for (field, column) in fields.iter().zip(s.columns()) {
382                    self.encode_dictionaries(
383                        field,
384                        column,
385                        encoded_dictionaries,
386                        dictionary_tracker,
387                        write_options,
388                        dict_id,
389                        ipc_write_context,
390                    )?;
391                }
392            }
393            DataType::RunEndEncoded(_, values) => {
394                let data = column.to_data();
395                if data.child_data().len() != 2 {
396                    return Err(ArrowError::InvalidArgumentError(format!(
397                        "The run encoded array should have exactly two child arrays. Found {}",
398                        data.child_data().len()
399                    )));
400                }
401                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
402                // only for values array.
403                let values_array = make_array(data.child_data()[1].clone());
404                self.encode_dictionaries(
405                    values,
406                    &values_array,
407                    encoded_dictionaries,
408                    dictionary_tracker,
409                    write_options,
410                    dict_id,
411                    ipc_write_context,
412                )?;
413            }
414            DataType::List(field) => {
415                let list = as_list_array(column);
416                self.encode_dictionaries(
417                    field,
418                    list.values(),
419                    encoded_dictionaries,
420                    dictionary_tracker,
421                    write_options,
422                    dict_id,
423                    ipc_write_context,
424                )?;
425            }
426            DataType::LargeList(field) => {
427                let list = as_large_list_array(column);
428                self.encode_dictionaries(
429                    field,
430                    list.values(),
431                    encoded_dictionaries,
432                    dictionary_tracker,
433                    write_options,
434                    dict_id,
435                    ipc_write_context,
436                )?;
437            }
438            DataType::ListView(field) => {
439                let list = column.as_list_view::<i32>();
440                self.encode_dictionaries(
441                    field,
442                    list.values(),
443                    encoded_dictionaries,
444                    dictionary_tracker,
445                    write_options,
446                    dict_id,
447                    ipc_write_context,
448                )?;
449            }
450            DataType::LargeListView(field) => {
451                let list = column.as_list_view::<i64>();
452                self.encode_dictionaries(
453                    field,
454                    list.values(),
455                    encoded_dictionaries,
456                    dictionary_tracker,
457                    write_options,
458                    dict_id,
459                    ipc_write_context,
460                )?;
461            }
462            DataType::FixedSizeList(field, _) => {
463                let list = column
464                    .as_any()
465                    .downcast_ref::<FixedSizeListArray>()
466                    .expect("Unable to downcast to fixed size list array");
467                self.encode_dictionaries(
468                    field,
469                    list.values(),
470                    encoded_dictionaries,
471                    dictionary_tracker,
472                    write_options,
473                    dict_id,
474                    ipc_write_context,
475                )?;
476            }
477            DataType::Map(field, _) => {
478                let map_array = as_map_array(column);
479
480                let (keys, values) = match field.data_type() {
481                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
482                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
483                };
484
485                // keys
486                self.encode_dictionaries(
487                    keys,
488                    map_array.keys(),
489                    encoded_dictionaries,
490                    dictionary_tracker,
491                    write_options,
492                    dict_id,
493                    ipc_write_context,
494                )?;
495
496                // values
497                self.encode_dictionaries(
498                    values,
499                    map_array.values(),
500                    encoded_dictionaries,
501                    dictionary_tracker,
502                    write_options,
503                    dict_id,
504                    ipc_write_context,
505                )?;
506            }
507            DataType::Union(fields, _) => {
508                let union = as_union_array(column);
509                for (type_id, field) in fields.iter() {
510                    let column = union.child(type_id);
511                    self.encode_dictionaries(
512                        field,
513                        column,
514                        encoded_dictionaries,
515                        dictionary_tracker,
516                        write_options,
517                        dict_id,
518                        ipc_write_context,
519                    )?;
520                }
521            }
522            _ => (),
523        }
524
525        Ok(())
526    }
527
528    #[allow(clippy::too_many_arguments)]
529    fn encode_dictionaries<I: Iterator<Item = i64>>(
530        &self,
531        field: &Field,
532        column: &ArrayRef,
533        encoded_dictionaries: &mut Vec<EncodedData>,
534        dictionary_tracker: &mut DictionaryTracker,
535        write_options: &IpcWriteOptions,
536        dict_id_seq: &mut I,
537        ipc_write_context: &mut IpcWriteContext,
538    ) -> Result<(), ArrowError> {
539        match column.data_type() {
540            DataType::Dictionary(_key_type, value_type) => {
541                if matches!(value_type.as_ref(), DataType::Dictionary(_, _)) {
542                    return Err(ArrowError::InvalidArgumentError(format!(
543                        "Arrow IPC field metadata cannot encode direct dictionary-of-dictionary values for field {:?}",
544                        field.name()
545                    )));
546                }
547
548                let dict_data = column.to_data();
549                let dict_values = &dict_data.child_data()[0];
550
551                let values = make_array(dict_data.child_data()[0].clone());
552
553                self._encode_dictionaries(
554                    &values,
555                    encoded_dictionaries,
556                    dictionary_tracker,
557                    write_options,
558                    dict_id_seq,
559                    ipc_write_context,
560                )?;
561
562                // It's important to only take the dict_id at this point, because the dict ID
563                // sequence is assigned depth-first, so we need to first encode children and have
564                // them take their assigned dict IDs before we take the dict ID for this field.
565                let dict_id = dict_id_seq.next().ok_or_else(|| {
566                    ArrowError::IpcError(format!(
567                        "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
568                        field.name(),
569                        field.data_type(),
570                        column.data_type()
571                    ))
572                })?;
573
574                match dictionary_tracker.insert_column(
575                    dict_id,
576                    column,
577                    write_options.dictionary_handling,
578                )? {
579                    DictionaryUpdate::None => {}
580                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
581                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
582                            dict_id,
583                            dict_values,
584                            write_options,
585                            false,
586                            ipc_write_context,
587                        )?);
588                    }
589                    DictionaryUpdate::Delta(data) => {
590                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
591                            dict_id,
592                            &data,
593                            write_options,
594                            true,
595                            ipc_write_context,
596                        )?);
597                    }
598                }
599            }
600            _ => self._encode_dictionaries(
601                column,
602                encoded_dictionaries,
603                dictionary_tracker,
604                write_options,
605                dict_id_seq,
606                ipc_write_context,
607            )?,
608        }
609
610        Ok(())
611    }
612
613    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
614    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
615    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
616    pub fn encode(
617        &self,
618        batch: &RecordBatch,
619        dictionary_tracker: &mut DictionaryTracker,
620        write_options: &IpcWriteOptions,
621        ipc_write_context: &mut IpcWriteContext,
622    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
623        let encoded_dictionaries =
624            self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
625        let mut arrow_data = Vec::new();
626        let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
627            batch,
628            write_options,
629            ipc_write_context,
630            &mut IpcBodySink::Write(&mut arrow_data),
631        )?;
632        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
633        Ok((
634            encoded_dictionaries,
635            EncodedData {
636                ipc_message,
637                arrow_data,
638            },
639        ))
640    }
641
642    /// Encode dictionary batches for all columns in `batch`.
643    fn encode_all_dicts(
644        &self,
645        batch: &RecordBatch,
646        dictionary_tracker: &mut DictionaryTracker,
647        write_options: &IpcWriteOptions,
648        ipc_write_context: &mut IpcWriteContext,
649    ) -> Result<Vec<EncodedData>, ArrowError> {
650        let schema = batch.schema();
651        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
652        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
653        for (i, field) in schema.fields().iter().enumerate() {
654            self.encode_dictionaries(
655                field,
656                batch.column(i),
657                &mut encoded_dictionaries,
658                dictionary_tracker,
659                write_options,
660                &mut dict_id,
661                ipc_write_context,
662            )?;
663        }
664        Ok(encoded_dictionaries)
665    }
666
667    /// Write dictionary batches and the record batch directly to `writer`, skipping the
668    /// intermediate body `Vec<u8>` allocations
669    /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer blocks.
670    fn write<W: Write>(
671        &self,
672        batch: &RecordBatch,
673        dictionary_tracker: &mut DictionaryTracker,
674        write_options: &IpcWriteOptions,
675        ipc_write_context: &mut IpcWriteContext,
676        writer: &mut W,
677    ) -> Result<IpcWriteMetadata, ArrowError> {
678        let encoded_dictionaries =
679            self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
680
681        let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
682        for dict in encoded_dictionaries {
683            dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
684        }
685
686        let capacity = batch
687            .columns()
688            .iter()
689            .map(|a| estimate_encoded_buffer_count(a.data_type()))
690            .sum();
691        let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
692        let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
693            batch,
694            write_options,
695            ipc_write_context,
696            &mut IpcBodySink::Collect(&mut encoded_buffers),
697        )?;
698
699        let alignment = write_options.alignment;
700        let a = usize::from(alignment - 1);
701        let prefix_size = if write_options.write_legacy_ipc_format {
702            4
703        } else {
704            8
705        };
706        let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
707        write_continuation(
708            &mut *writer,
709            write_options,
710            (aligned_size - prefix_size) as i32,
711        )?;
712        writer.write_all(&ipc_message)?;
713        writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
714        for enc in &encoded_buffers {
715            writer.write_all(enc.as_slice())?;
716            writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
717        }
718        writer.write_all(&PADDING[..tail_pad])?;
719
720        Ok(IpcWriteMetadata {
721            dictionary_block_sizes,
722            padded_header_len: aligned_size,
723            body_len,
724        })
725    }
726
727    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
728    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
729    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
730    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
731    pub fn encoded_batch(
732        &self,
733        batch: &RecordBatch,
734        dictionary_tracker: &mut DictionaryTracker,
735        write_options: &IpcWriteOptions,
736    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
737        self.encode(
738            batch,
739            dictionary_tracker,
740            write_options,
741            &mut Default::default(),
742        )
743    }
744
745    /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` with the
746    /// serialised buffer data.
747    ///
748    /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header bytes, the
749    /// total body length including trailing padding, and the trailing alignment padding byte count.
750    fn record_batch_to_bytes(
751        &self,
752        batch: &RecordBatch,
753        write_options: &IpcWriteOptions,
754        ipc_write_context: &mut IpcWriteContext,
755        sink: &mut IpcBodySink<'_>,
756    ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
757        let batch_compression_type = write_options.batch_compression_type;
758
759        let compression = batch_compression_type.map(|batch_compression_type| {
760            let fbb = ipc_write_context.mut_fbb();
761            let mut c = crate::BodyCompressionBuilder::new(fbb);
762            c.add_method(crate::BodyCompressionMethod::BUFFER);
763            c.add_codec(batch_compression_type);
764            c.finish()
765        });
766
767        let batch_compression_level = write_options.batch_compression_level;
768        let compression_codec: Option<CompressionCodec> = batch_compression_type
769            .map(|compression_type| match batch_compression_level {
770                Some(level) => {
771                    CompressionCodec::try_new_with_compression_level(compression_type, level)
772                }
773                None => compression_type.try_into(),
774            })
775            .transpose()?;
776
777        let alignment = write_options.alignment;
778        let mut variadic_buffer_counts = vec![];
779        let mut meta = IpcMetadataBuilder::default();
780        let mut offset = 0i64;
781
782        for array in batch.columns() {
783            let array_data = array.to_data();
784            offset = write_array_data(
785                &array_data,
786                &mut meta,
787                sink,
788                offset,
789                compression_codec,
790                ipc_write_context,
791                write_options,
792            )?;
793            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
794        }
795
796        let tail_pad = pad_to_alignment(alignment, offset as usize);
797        let body_len = offset as usize + tail_pad;
798
799        let fbb = ipc_write_context.mut_fbb();
800        let buffers = fbb.create_vector(&meta.buffers);
801        let nodes = fbb.create_vector(&meta.nodes);
802        let variadic_buffer = if variadic_buffer_counts.is_empty() {
803            None
804        } else {
805            Some(fbb.create_vector(&variadic_buffer_counts))
806        };
807
808        let root = {
809            let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
810            batch_builder.add_length(batch.num_rows() as i64);
811            batch_builder.add_nodes(nodes);
812            batch_builder.add_buffers(buffers);
813            if let Some(c) = compression {
814                batch_builder.add_compression(c);
815            }
816            if let Some(v) = variadic_buffer {
817                batch_builder.add_variadicBufferCounts(v);
818            }
819            batch_builder.finish().as_union_value()
820        };
821        let mut message = crate::MessageBuilder::new(fbb);
822        message.add_version(write_options.metadata_version);
823        message.add_header_type(crate::MessageHeader::RecordBatch);
824        message.add_bodyLength(body_len as i64);
825        message.add_header(root);
826        let root = message.finish();
827        fbb.finish(root, None);
828
829        let ipc_message = fbb.finished_data().to_vec();
830        fbb.reset();
831        Ok((ipc_message, body_len, tail_pad))
832    }
833
834    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
835    /// other for the data
836    fn dictionary_batch_to_bytes(
837        &self,
838        dict_id: i64,
839        array_data: &ArrayData,
840        write_options: &IpcWriteOptions,
841        is_delta: bool,
842        ipc_write_context: &mut IpcWriteContext,
843    ) -> Result<EncodedData, ArrowError> {
844        let mut arrow_data: Vec<u8> = vec![];
845
846        // get the type of compression
847        let batch_compression_type = write_options.batch_compression_type;
848
849        let compression = batch_compression_type.map(|batch_compression_type| {
850            let fbb = ipc_write_context.mut_fbb();
851            let mut c = crate::BodyCompressionBuilder::new(fbb);
852            c.add_method(crate::BodyCompressionMethod::BUFFER);
853            c.add_codec(batch_compression_type);
854            c.finish()
855        });
856
857        let batch_compression_level = write_options.batch_compression_level;
858        let compression_codec: Option<CompressionCodec> = batch_compression_type
859            .map(|batch_compression_type| match batch_compression_level {
860                Some(level) => {
861                    CompressionCodec::try_new_with_compression_level(batch_compression_type, level)
862                }
863                None => batch_compression_type.try_into(),
864            })
865            .transpose()?;
866
867        let alignment = write_options.alignment;
868        let mut meta = IpcMetadataBuilder::default();
869        let mut sink = IpcBodySink::Write(&mut arrow_data);
870        let offset = write_array_data(
871            array_data,
872            &mut meta,
873            &mut sink,
874            0,
875            compression_codec,
876            ipc_write_context,
877            write_options,
878        )?;
879
880        let mut variadic_buffer_counts = vec![];
881        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
882
883        // pad the tail of body data
884        let tail_pad = pad_to_alignment(alignment, offset as usize);
885        let body_len = offset as usize + tail_pad;
886        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
887
888        let fbb = ipc_write_context.mut_fbb();
889        let buffers = fbb.create_vector(&meta.buffers);
890        let nodes = fbb.create_vector(&meta.nodes);
891        let variadic_buffer = if variadic_buffer_counts.is_empty() {
892            None
893        } else {
894            Some(fbb.create_vector(&variadic_buffer_counts))
895        };
896
897        let root = {
898            let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
899            batch_builder.add_length(array_data.len() as i64);
900            batch_builder.add_nodes(nodes);
901            batch_builder.add_buffers(buffers);
902            if let Some(c) = compression {
903                batch_builder.add_compression(c);
904            }
905            if let Some(v) = variadic_buffer {
906                batch_builder.add_variadicBufferCounts(v);
907            }
908            batch_builder.finish()
909        };
910
911        let root = {
912            let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb);
913            batch_builder.add_id(dict_id);
914            batch_builder.add_data(root);
915            batch_builder.add_isDelta(is_delta);
916            batch_builder.finish().as_union_value()
917        };
918
919        let root = {
920            let mut message_builder = crate::MessageBuilder::new(fbb);
921            message_builder.add_version(write_options.metadata_version);
922            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
923            message_builder.add_bodyLength(body_len as i64);
924            message_builder.add_header(root);
925            message_builder.finish()
926        };
927
928        fbb.finish(root, None);
929        let ipc_message = fbb.finished_data().to_vec();
930        fbb.reset();
931
932        Ok(EncodedData {
933            ipc_message,
934            arrow_data,
935        })
936    }
937}
938
939fn ensure_supported_ipc_schema(schema: &Schema) -> Result<(), ArrowError> {
940    schema
941        .fields()
942        .iter()
943        .try_for_each(|field| ensure_supported_ipc_data_type(field.name(), field.data_type()))
944}
945
946fn ensure_supported_ipc_data_type(
947    field_name: &str,
948    data_type: &DataType,
949) -> Result<(), ArrowError> {
950    match data_type {
951        DataType::Dictionary(_, value_type)
952            if matches!(value_type.as_ref(), DataType::Dictionary(_, _)) =>
953        {
954            Err(ArrowError::InvalidArgumentError(format!(
955                "Arrow IPC field metadata cannot encode direct dictionary-of-dictionary values for field {field_name:?}"
956            )))
957        }
958        DataType::Dictionary(_, value_type) => {
959            ensure_supported_ipc_data_type(field_name, value_type)
960        }
961        DataType::Struct(fields) => fields
962            .iter()
963            .try_for_each(|field| ensure_supported_ipc_data_type(field.name(), field.data_type())),
964        DataType::RunEndEncoded(_, field)
965        | DataType::List(field)
966        | DataType::LargeList(field)
967        | DataType::ListView(field)
968        | DataType::LargeListView(field)
969        | DataType::FixedSizeList(field, _)
970        | DataType::Map(field, _) => {
971            ensure_supported_ipc_data_type(field.name(), field.data_type())
972        }
973        DataType::Union(fields, _) => fields.iter().try_for_each(|(_, field)| {
974            ensure_supported_ipc_data_type(field.name(), field.data_type())
975        }),
976        _ => Ok(()),
977    }
978}
979
980fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
981    match array.data_type() {
982        DataType::BinaryView | DataType::Utf8View => {
983            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
984            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
985            counts.push(array.buffers().len() as i64 - 1);
986        }
987        DataType::Dictionary(_, _) => {
988            // Do nothing
989            // Dictionary types are handled in `encode_dictionaries`.
990        }
991        _ => {
992            for child in array.child_data() {
993                append_variadic_buffer_counts(counts, child)
994            }
995        }
996    }
997}
998
999pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
1000    match arr.data_type() {
1001        DataType::RunEndEncoded(k, _) => match k.data_type() {
1002            DataType::Int16 => {
1003                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
1004            }
1005            DataType::Int32 => {
1006                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
1007            }
1008            DataType::Int64 => {
1009                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
1010            }
1011            d => unreachable!("Unexpected data type {d}"),
1012        },
1013        d => Err(ArrowError::InvalidArgumentError(format!(
1014            "The given array is not a run array. Data type of given array: {d}"
1015        ))),
1016    }
1017}
1018
1019// Returns a `RunArray` with zero offset and length matching the last value
1020// in run_ends array.
1021fn into_zero_offset_run_array<R: RunEndIndexType>(
1022    run_array: RunArray<R>,
1023) -> Result<RunArray<R>, ArrowError> {
1024    let run_ends = run_array.run_ends();
1025    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
1026        return Ok(run_array);
1027    }
1028
1029    // The physical index of original run_ends array from which the `ArrayData`is sliced.
1030    let start_physical_index = run_ends.get_start_physical_index();
1031
1032    // The physical index of original run_ends array until which the `ArrayData`is sliced.
1033    let end_physical_index = run_ends.get_end_physical_index();
1034
1035    let physical_length = end_physical_index - start_physical_index + 1;
1036
1037    // build new run_ends array by subtracting offset from run ends.
1038    let offset = R::Native::usize_as(run_ends.offset());
1039    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
1040    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
1041        builder.append(run_end_value.sub_wrapping(offset));
1042    }
1043    builder.append(R::Native::from_usize(run_array.len()).unwrap());
1044    let new_run_ends = unsafe {
1045        // Safety:
1046        // The function builds a valid run_ends array and hence need not be validated.
1047        ArrayDataBuilder::new(R::DATA_TYPE)
1048            .len(physical_length)
1049            .add_buffer(builder.finish())
1050            .build_unchecked()
1051    };
1052
1053    // build new values by slicing physical indices.
1054    let new_values = run_array
1055        .values()
1056        .slice(start_physical_index, physical_length)
1057        .into_data();
1058
1059    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
1060        .len(run_array.len())
1061        .add_child_data(new_run_ends)
1062        .add_child_data(new_values);
1063    let array_data = unsafe {
1064        // Safety:
1065        //  This function builds a valid run array and hence can skip validation.
1066        builder.build_unchecked()
1067    };
1068    Ok(array_data.into())
1069}
1070
1071/// Controls how dictionaries are handled in Arrow IPC messages
1072#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1073pub enum DictionaryHandling {
1074    /// Send the entire dictionary every time it is encountered (default)
1075    #[default]
1076    Resend,
1077    /// Send only new dictionary values since the last batch (delta encoding)
1078    ///
1079    /// When a dictionary is first encountered, the entire dictionary is sent.
1080    /// For subsequent batches, only values that are new (not previously sent)
1081    /// are transmitted with the `isDelta` flag set to true.
1082    Delta,
1083}
1084
1085/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
1086#[derive(Debug, Clone)]
1087pub enum DictionaryUpdate {
1088    /// No dictionary was written, the dictionary was identical to what was already
1089    /// in the tracker.
1090    None,
1091    /// No dictionary was present in the tracker
1092    New,
1093    /// Dictionary was replaced with the new data
1094    Replaced,
1095    /// Dictionary was updated, ArrayData is the delta between old and new
1096    Delta(ArrayData),
1097}
1098
1099/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
1100/// multiple times.
1101///
1102/// Can optionally error if an update to an existing dictionary is attempted, which
1103/// isn't allowed in the `FileWriter`.
1104#[derive(Debug)]
1105pub struct DictionaryTracker {
1106    // NOTE: When adding fields, update the clear() method accordingly.
1107    written: HashMap<i64, ArrayData>,
1108    dict_ids: Vec<i64>,
1109    error_on_replacement: bool,
1110}
1111
1112impl DictionaryTracker {
1113    /// Create a new [`DictionaryTracker`].
1114    ///
1115    /// If `error_on_replacement`
1116    /// is true, an error will be generated if an update to an
1117    /// existing dictionary is attempted.
1118    pub fn new(error_on_replacement: bool) -> Self {
1119        #[allow(deprecated)]
1120        Self {
1121            written: HashMap::new(),
1122            dict_ids: Vec::new(),
1123            error_on_replacement,
1124        }
1125    }
1126
1127    /// Record and return the next dictionary ID.
1128    pub fn next_dict_id(&mut self) -> i64 {
1129        let next = self
1130            .dict_ids
1131            .last()
1132            .copied()
1133            .map(|i| i + 1)
1134            .unwrap_or_default();
1135
1136        self.dict_ids.push(next);
1137        next
1138    }
1139
1140    /// Return the sequence of dictionary IDs in the order they should be observed while
1141    /// traversing the schema
1142    pub fn dict_id(&mut self) -> &[i64] {
1143        &self.dict_ids
1144    }
1145
1146    /// Keep track of the dictionary with the given ID and values. Behavior:
1147    ///
1148    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
1149    ///   that the dictionary was not actually inserted (because it's already been seen).
1150    /// * If this ID has been written already but with different data, and this tracker is
1151    ///   configured to return an error, return an error.
1152    /// * If the tracker has not been configured to error on replacement or this dictionary
1153    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
1154    ///   inserted.
1155    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1156    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1157        let dict_data = column.to_data();
1158        let dict_values = &dict_data.child_data()[0];
1159
1160        // If a dictionary with this id was already emitted, check if it was the same.
1161        if let Some(last) = self.written.get(&dict_id) {
1162            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1163                // Same dictionary values => no need to emit it again
1164                return Ok(false);
1165            }
1166            if self.error_on_replacement {
1167                // If error on replacement perform a logical comparison
1168                if last.child_data()[0] == *dict_values {
1169                    // Same dictionary values => no need to emit it again
1170                    return Ok(false);
1171                }
1172                return Err(ArrowError::InvalidArgumentError(
1173                    "Dictionary replacement detected when writing IPC file format. \
1174                     Arrow IPC files only support a single dictionary for a given field \
1175                     across all batches."
1176                        .to_string(),
1177                ));
1178            }
1179        }
1180
1181        self.written.insert(dict_id, dict_data);
1182        Ok(true)
1183    }
1184
1185    /// Keep track of the dictionary with the given ID and values. The return
1186    /// value indicates what, if any, update to the internal map took place
1187    /// and how it should be interpreted based on the `dict_handling` parameter.
1188    ///
1189    /// # Returns
1190    ///
1191    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
1192    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
1193    ///   with completely different data, or if the data is a delta of the existing,
1194    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
1195    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
1196    ///   the new data is a delta of the old and the `dict_handling` is set to
1197    ///   `DictionaryHandling::Delta`
1198    /// * `Err(e)` - If the dictionary was previously written with different data,
1199    ///   and `error_on_replacement` is set to `true`.
1200    pub fn insert_column(
1201        &mut self,
1202        dict_id: i64,
1203        column: &ArrayRef,
1204        dict_handling: DictionaryHandling,
1205    ) -> Result<DictionaryUpdate, ArrowError> {
1206        let new_data = column.to_data();
1207        let new_values = &new_data.child_data()[0];
1208
1209        // If there is no existing dictionary with this ID, we always insert
1210        let Some(old) = self.written.get(&dict_id) else {
1211            self.written.insert(dict_id, new_data);
1212            return Ok(DictionaryUpdate::New);
1213        };
1214
1215        // Fast path - If the array data points to the same buffer as the
1216        // existing then they're the same.
1217        let old_values = &old.child_data()[0];
1218        if ArrayData::ptr_eq(old_values, new_values) {
1219            return Ok(DictionaryUpdate::None);
1220        }
1221
1222        // Slow path - Compare the dictionaries value by value
1223        let comparison = compare_dictionaries(old_values, new_values);
1224        if matches!(comparison, DictionaryComparison::Equal) {
1225            return Ok(DictionaryUpdate::None);
1226        }
1227
1228        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1229                 Arrow IPC files only support a single dictionary for a given field \
1230                 across all batches.";
1231
1232        match comparison {
1233            DictionaryComparison::NotEqual => {
1234                if self.error_on_replacement {
1235                    return Err(ArrowError::InvalidArgumentError(
1236                        REPLACEMENT_ERROR.to_string(),
1237                    ));
1238                }
1239
1240                self.written.insert(dict_id, new_data);
1241                Ok(DictionaryUpdate::Replaced)
1242            }
1243            DictionaryComparison::Delta => match dict_handling {
1244                DictionaryHandling::Resend => {
1245                    if self.error_on_replacement {
1246                        return Err(ArrowError::InvalidArgumentError(
1247                            REPLACEMENT_ERROR.to_string(),
1248                        ));
1249                    }
1250
1251                    self.written.insert(dict_id, new_data);
1252                    Ok(DictionaryUpdate::Replaced)
1253                }
1254                DictionaryHandling::Delta => {
1255                    let delta =
1256                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
1257                    self.written.insert(dict_id, new_data);
1258                    Ok(DictionaryUpdate::Delta(delta))
1259                }
1260            },
1261            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1262        }
1263    }
1264
1265    /// Clears the state of the dictionary tracker.
1266    ///
1267    /// This allows the dictionary tracker to be reused for a new IPC stream while avoiding the
1268    /// allocation cost of creating a new instance. This method should not be called if
1269    /// the dictionary tracker will be used to continue writing to an existing IPC stream.
1270    pub fn clear(&mut self) {
1271        self.dict_ids.clear();
1272        self.written.clear();
1273    }
1274}
1275
1276/// Describes how two dictionary arrays compare to each other.
1277#[derive(Debug, Clone)]
1278enum DictionaryComparison {
1279    /// Neither a delta, nor an exact match
1280    NotEqual,
1281    /// Exact element-wise match
1282    Equal,
1283    /// The two arrays are dictionary deltas of each other, meaning the first
1284    /// is a prefix of the second.
1285    Delta,
1286}
1287
1288// Compares two dictionaries and returns a [`DictionaryComparison`].
1289fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1290    // Check for exact match
1291    let existing_len = old.len();
1292    let new_len = new.len();
1293    if existing_len == new_len {
1294        if *old == *new {
1295            return DictionaryComparison::Equal;
1296        } else {
1297            return DictionaryComparison::NotEqual;
1298        }
1299    }
1300
1301    // Can't be a delta if the new is shorter than the existing
1302    if new_len < existing_len {
1303        return DictionaryComparison::NotEqual;
1304    }
1305
1306    // Check for delta
1307    if new.slice(0, existing_len) == *old {
1308        return DictionaryComparison::Delta;
1309    }
1310
1311    DictionaryComparison::NotEqual
1312}
1313
1314/// Arrow File Writer
1315///
1316/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1317///
1318/// # See Also
1319///
1320/// * [`StreamWriter`] for writing IPC Streams
1321///
1322/// # Example
1323/// ```
1324/// # use arrow_array::record_batch;
1325/// # use arrow_ipc::writer::FileWriter;
1326/// # let mut file = vec![]; // mimic a file for the example
1327/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1328/// // create a new writer, the schema must be known in advance
1329/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1330/// // write each batch to the underlying writer
1331/// writer.write(&batch).unwrap();
1332/// // When all batches are written, call finish to flush all buffers
1333/// writer.finish().unwrap();
1334/// ```
1335/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1336pub struct FileWriter<W> {
1337    /// The object to write to
1338    writer: W,
1339    /// IPC write options
1340    write_options: IpcWriteOptions,
1341    /// A reference to the schema, used in validating record batches
1342    schema: SchemaRef,
1343    /// The number of bytes between each block of bytes, as an offset for random access
1344    block_offsets: usize,
1345    /// Dictionary blocks that will be written as part of the IPC footer
1346    dictionary_blocks: Vec<crate::Block>,
1347    /// Record blocks that will be written as part of the IPC footer
1348    record_blocks: Vec<crate::Block>,
1349    /// Whether the writer footer has been written, and the writer is finished
1350    finished: bool,
1351    /// Keeps track of dictionaries that have been written
1352    dictionary_tracker: DictionaryTracker,
1353    /// User level customized metadata
1354    custom_metadata: HashMap<String, String>,
1355
1356    data_gen: IpcDataGenerator,
1357
1358    ipc_write_context: IpcWriteContext,
1359}
1360
1361impl<W: Write> FileWriter<BufWriter<W>> {
1362    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1363    ///
1364    /// See [`FileWriter::try_new`] for an unbuffered version.
1365    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1366        Self::try_new(BufWriter::new(writer), schema)
1367    }
1368}
1369
1370impl<W: Write> FileWriter<W> {
1371    /// Try to create a new writer, with the schema written as part of the header
1372    ///
1373    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1374    ///
1375    /// # Errors
1376    ///
1377    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1378    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1379        let write_options = IpcWriteOptions::default();
1380        Self::try_new_with_options(writer, schema, write_options)
1381    }
1382
1383    /// Try to create a new writer with IpcWriteOptions
1384    ///
1385    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1386    ///
1387    /// # Errors
1388    ///
1389    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1390    pub fn try_new_with_options(
1391        mut writer: W,
1392        schema: &Schema,
1393        write_options: IpcWriteOptions,
1394    ) -> Result<Self, ArrowError> {
1395        ensure_supported_ipc_schema(schema)?;
1396
1397        let data_gen = IpcDataGenerator::default();
1398        // write magic to header aligned on alignment boundary
1399        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1400        let header_size = super::ARROW_MAGIC.len() + pad_len;
1401        writer.write_all(&super::ARROW_MAGIC)?;
1402        writer.write_all(&PADDING[..pad_len])?;
1403        // write the schema, set the written bytes to the schema + header
1404        let mut dictionary_tracker = DictionaryTracker::new(true);
1405        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1406            schema,
1407            &mut dictionary_tracker,
1408            &write_options,
1409        );
1410        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1411        Ok(Self {
1412            writer,
1413            write_options,
1414            schema: Arc::new(schema.clone()),
1415            block_offsets: meta + data + header_size,
1416            dictionary_blocks: vec![],
1417            record_blocks: vec![],
1418            finished: false,
1419            dictionary_tracker,
1420            custom_metadata: HashMap::new(),
1421            data_gen,
1422            ipc_write_context: IpcWriteContext::default(),
1423        })
1424    }
1425
1426    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1427    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1428        self.custom_metadata.insert(key.into(), value.into());
1429    }
1430
1431    /// Write a record batch to the file
1432    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1433        if self.finished {
1434            return Err(ArrowError::IpcError(
1435                "Cannot write record batch to file writer as it is closed".to_string(),
1436            ));
1437        }
1438
1439        let meta = self.data_gen.write(
1440            batch,
1441            &mut self.dictionary_tracker,
1442            &self.write_options,
1443            &mut self.ipc_write_context,
1444            &mut self.writer,
1445        )?;
1446
1447        for (header_len, body_len) in meta.dictionary_block_sizes {
1448            let block = crate::Block::new(
1449                self.block_offsets as i64,
1450                header_len as i32,
1451                body_len as i64,
1452            );
1453            self.dictionary_blocks.push(block);
1454            self.block_offsets += header_len + body_len;
1455        }
1456
1457        // add a record block for the footer
1458        let block = crate::Block::new(
1459            self.block_offsets as i64,
1460            meta.padded_header_len as i32,
1461            meta.body_len as i64,
1462        );
1463        self.record_blocks.push(block);
1464        self.block_offsets += meta.padded_header_len + meta.body_len;
1465        Ok(())
1466    }
1467
1468    /// Write footer and closing tag, then mark the writer as done
1469    pub fn finish(&mut self) -> Result<(), ArrowError> {
1470        if self.finished {
1471            return Err(ArrowError::IpcError(
1472                "Cannot write footer to file writer as it is closed".to_string(),
1473            ));
1474        }
1475
1476        // write EOS
1477        write_continuation(&mut self.writer, &self.write_options, 0)?;
1478
1479        let mut fbb = FlatBufferBuilder::new();
1480        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1481        let record_batches = fbb.create_vector(&self.record_blocks);
1482
1483        // dictionaries are already written, so we can reset dictionary tracker to reuse for schema
1484        self.dictionary_tracker.clear();
1485        let schema = IpcSchemaEncoder::new()
1486            .with_dictionary_tracker(&mut self.dictionary_tracker)
1487            .schema_to_fb_offset(&mut fbb, &self.schema);
1488        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1489            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1490
1491        let root = {
1492            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1493            footer_builder.add_version(self.write_options.metadata_version);
1494            footer_builder.add_schema(schema);
1495            footer_builder.add_dictionaries(dictionaries);
1496            footer_builder.add_recordBatches(record_batches);
1497            if let Some(fb_custom_metadata) = fb_custom_metadata {
1498                footer_builder.add_custom_metadata(fb_custom_metadata);
1499            }
1500            footer_builder.finish()
1501        };
1502        fbb.finish(root, None);
1503        let footer_data = fbb.finished_data();
1504        self.writer.write_all(footer_data)?;
1505        self.writer
1506            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1507        self.writer.write_all(&super::ARROW_MAGIC)?;
1508        self.writer.flush()?;
1509        self.finished = true;
1510
1511        Ok(())
1512    }
1513
1514    /// Returns the arrow [`SchemaRef`] for this arrow file.
1515    pub fn schema(&self) -> &SchemaRef {
1516        &self.schema
1517    }
1518
1519    /// Gets a reference to the underlying writer.
1520    pub fn get_ref(&self) -> &W {
1521        &self.writer
1522    }
1523
1524    /// Gets a mutable reference to the underlying writer.
1525    ///
1526    /// It is inadvisable to directly write to the underlying writer.
1527    pub fn get_mut(&mut self) -> &mut W {
1528        &mut self.writer
1529    }
1530
1531    /// Flush the underlying writer.
1532    ///
1533    /// Both the BufWriter and the underlying writer are flushed.
1534    pub fn flush(&mut self) -> Result<(), ArrowError> {
1535        self.writer.flush()?;
1536        Ok(())
1537    }
1538
1539    /// Unwraps the underlying writer.
1540    ///
1541    /// The writer is flushed and the FileWriter is finished before returning.
1542    ///
1543    /// # Errors
1544    ///
1545    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1546    /// or while flushing the writer.
1547    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1548        if !self.finished {
1549            // `finish` flushes the writer.
1550            self.finish()?;
1551        }
1552        Ok(self.writer)
1553    }
1554}
1555
1556impl<W: Write> RecordBatchWriter for FileWriter<W> {
1557    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1558        self.write(batch)
1559    }
1560
1561    fn close(mut self) -> Result<(), ArrowError> {
1562        self.finish()
1563    }
1564}
1565
1566/// Arrow Stream Writer
1567///
1568/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1569///
1570/// # See Also
1571///
1572/// * [`FileWriter`] for writing IPC Files
1573///
1574/// # Example - Basic usage
1575/// ```
1576/// # use arrow_array::record_batch;
1577/// # use arrow_ipc::writer::StreamWriter;
1578/// # let mut stream = vec![]; // mimic a stream for the example
1579/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1580/// // create a new writer, the schema must be known in advance
1581/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1582/// // write each batch to the underlying stream
1583/// writer.write(&batch).unwrap();
1584/// // When all batches are written, call finish to flush all buffers
1585/// writer.finish().unwrap();
1586/// ```
1587/// # Example - Efficient delta dictionaries
1588/// ```
1589/// # use arrow_array::record_batch;
1590/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1591/// # use arrow_ipc::writer::DictionaryHandling;
1592/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1593/// # use arrow_array::{
1594/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1595/// #    RecordBatch, StringArray,
1596/// # };
1597/// # use std::sync::Arc;
1598///
1599/// let schema = Arc::new(Schema::new(vec![Field::new(
1600///    "col1",
1601///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1602///    true,
1603/// )]));
1604///
1605/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1606///
1607/// // `finish_preserve_values` will keep the dictionary values along with their
1608/// // key assignments so that they can be re-used in the next batch.
1609/// builder.append("a").unwrap();
1610/// builder.append("b").unwrap();
1611/// let array1 = builder.finish_preserve_values();
1612/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1613///
1614/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1615/// // and 'd' will take the next available key.
1616/// builder.append("a").unwrap();
1617/// builder.append("d").unwrap();
1618/// let array2 = builder.finish_preserve_values();
1619/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1620///
1621/// let mut stream = vec![];
1622/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1623/// // enable delta dictionaries in the writer
1624/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1625/// let mut writer = StreamWriter::try_new_with_options(&mut stream, &schema, options).unwrap();
1626///
1627/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1628/// // prior to the record batch.
1629/// writer.write(&batch1).unwrap();
1630/// // With the second batch only a delta dictionary with 'd' will be written
1631/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1632/// // Without it, 'a' and 'd' in this batch would have different keys than the
1633/// // first batch and so we'd have to send a replacement dictionary with new keys
1634/// // for both.
1635/// writer.write(&batch2).unwrap();
1636/// writer.finish().unwrap();
1637/// ```
1638/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1639pub struct StreamWriter<W> {
1640    /// The object to write to
1641    writer: W,
1642    /// IPC write options
1643    write_options: IpcWriteOptions,
1644    /// Whether the writer footer has been written, and the writer is finished
1645    finished: bool,
1646    /// Keeps track of dictionaries that have been written
1647    dictionary_tracker: DictionaryTracker,
1648
1649    data_gen: IpcDataGenerator,
1650
1651    ipc_write_context: IpcWriteContext,
1652}
1653
1654impl<W: Write> StreamWriter<BufWriter<W>> {
1655    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1656    ///
1657    /// See [`StreamWriter::try_new`] for an unbuffered version.
1658    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1659        Self::try_new(BufWriter::new(writer), schema)
1660    }
1661}
1662
1663impl<W: Write> StreamWriter<W> {
1664    /// Try to create a new writer, with the schema written as part of the header.
1665    ///
1666    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1667    ///
1668    /// # Errors
1669    ///
1670    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1671    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1672        let write_options = IpcWriteOptions::default();
1673        Self::try_new_with_options(writer, schema, write_options)
1674    }
1675
1676    /// Try to create a new writer with [`IpcWriteOptions`].
1677    ///
1678    /// # Errors
1679    ///
1680    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1681    pub fn try_new_with_options(
1682        mut writer: W,
1683        schema: &Schema,
1684        write_options: IpcWriteOptions,
1685    ) -> Result<Self, ArrowError> {
1686        ensure_supported_ipc_schema(schema)?;
1687
1688        let data_gen = IpcDataGenerator::default();
1689        let mut dictionary_tracker = DictionaryTracker::new(false);
1690
1691        // write the schema, set the written bytes to the schema
1692        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1693            schema,
1694            &mut dictionary_tracker,
1695            &write_options,
1696        );
1697        write_message(&mut writer, encoded_message, &write_options)?;
1698        Ok(Self {
1699            writer,
1700            write_options,
1701            finished: false,
1702            dictionary_tracker,
1703            data_gen,
1704            ipc_write_context: IpcWriteContext::default(),
1705        })
1706    }
1707
1708    /// Write a record batch to the stream
1709    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1710        if self.finished {
1711            return Err(ArrowError::IpcError(
1712                "Cannot write record batch to stream writer as it is closed".to_string(),
1713            ));
1714        }
1715
1716        self.data_gen.write(
1717            batch,
1718            &mut self.dictionary_tracker,
1719            &self.write_options,
1720            &mut self.ipc_write_context,
1721            &mut self.writer,
1722        )?;
1723        Ok(())
1724    }
1725
1726    /// Write continuation bytes, and mark the stream as done
1727    pub fn finish(&mut self) -> Result<(), ArrowError> {
1728        if self.finished {
1729            return Err(ArrowError::IpcError(
1730                "Cannot write footer to stream writer as it is closed".to_string(),
1731            ));
1732        }
1733
1734        write_continuation(&mut self.writer, &self.write_options, 0)?;
1735        self.writer.flush()?;
1736
1737        self.finished = true;
1738
1739        Ok(())
1740    }
1741
1742    /// Gets a reference to the underlying writer.
1743    pub fn get_ref(&self) -> &W {
1744        &self.writer
1745    }
1746
1747    /// Gets a mutable reference to the underlying writer.
1748    ///
1749    /// It is inadvisable to directly write to the underlying writer.
1750    pub fn get_mut(&mut self) -> &mut W {
1751        &mut self.writer
1752    }
1753
1754    /// Flush the underlying writer.
1755    ///
1756    /// Both the BufWriter and the underlying writer are flushed.
1757    pub fn flush(&mut self) -> Result<(), ArrowError> {
1758        self.writer.flush()?;
1759        Ok(())
1760    }
1761
1762    /// Unwraps the the underlying writer.
1763    ///
1764    /// The writer is flushed and the StreamWriter is finished before returning.
1765    ///
1766    /// # Errors
1767    ///
1768    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1769    /// or while flushing the writer.
1770    ///
1771    /// # Example
1772    ///
1773    /// ```
1774    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1775    /// # use arrow_ipc::MetadataVersion;
1776    /// # use arrow_schema::{ArrowError, Schema};
1777    /// # fn main() -> Result<(), ArrowError> {
1778    /// // The result we expect from an empty schema
1779    /// let expected = vec![
1780    ///     255, 255, 255, 255,  48,   0,   0,   0,
1781    ///      16,   0,   0,   0,   0,   0,  10,   0,
1782    ///      12,   0,  10,   0,   9,   0,   4,   0,
1783    ///      10,   0,   0,   0,  16,   0,   0,   0,
1784    ///       0,   1,   4,   0,   8,   0,   8,   0,
1785    ///       0,   0,   4,   0,   8,   0,   0,   0,
1786    ///       4,   0,   0,   0,   0,   0,   0,   0,
1787    ///     255, 255, 255, 255,   0,   0,   0,   0
1788    /// ];
1789    ///
1790    /// let schema = Schema::empty();
1791    /// let buffer: Vec<u8> = Vec::new();
1792    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1793    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1794    ///
1795    /// assert_eq!(stream_writer.into_inner()?, expected);
1796    /// # Ok(())
1797    /// # }
1798    /// ```
1799    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1800        if !self.finished {
1801            // `finish` flushes.
1802            self.finish()?;
1803        }
1804        Ok(self.writer)
1805    }
1806}
1807
1808impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1809    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1810        self.write(batch)
1811    }
1812
1813    fn close(mut self) -> Result<(), ArrowError> {
1814        self.finish()
1815    }
1816}
1817
1818/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1819pub struct EncodedData {
1820    /// An encoded crate::Message
1821    pub ipc_message: Vec<u8>,
1822    /// Arrow buffers to be written, should be an empty vec for schema messages
1823    pub arrow_data: Vec<u8>,
1824}
1825/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1826pub fn write_message<W: Write>(
1827    mut writer: W,
1828    encoded: EncodedData,
1829    write_options: &IpcWriteOptions,
1830) -> Result<(usize, usize), ArrowError> {
1831    let arrow_data_len = encoded.arrow_data.len();
1832    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1833        return Err(ArrowError::MemoryError(
1834            "Arrow data not aligned".to_string(),
1835        ));
1836    }
1837
1838    let a = usize::from(write_options.alignment - 1);
1839    let buffer = encoded.ipc_message;
1840    let flatbuf_size = buffer.len();
1841    let prefix_size = if write_options.write_legacy_ipc_format {
1842        4
1843    } else {
1844        8
1845    };
1846    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1847    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1848
1849    write_continuation(
1850        &mut writer,
1851        write_options,
1852        (aligned_size - prefix_size) as i32,
1853    )?;
1854
1855    // write the flatbuf
1856    if flatbuf_size > 0 {
1857        writer.write_all(&buffer)?;
1858    }
1859    // write padding
1860    writer.write_all(&PADDING[..padding_bytes])?;
1861
1862    // write arrow data
1863    let body_len = if arrow_data_len > 0 {
1864        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1865    } else {
1866        0
1867    };
1868
1869    Ok((aligned_size, body_len))
1870}
1871
1872fn write_body_buffers<W: Write>(
1873    mut writer: W,
1874    data: &[u8],
1875    alignment: u8,
1876) -> Result<usize, ArrowError> {
1877    let len = data.len();
1878    let pad_len = pad_to_alignment(alignment, len);
1879    let total_len = len + pad_len;
1880
1881    // write body buffer
1882    writer.write_all(data)?;
1883    if pad_len > 0 {
1884        writer.write_all(&PADDING[..pad_len])?;
1885    }
1886
1887    Ok(total_len)
1888}
1889
1890/// Write a record batch to the writer, writing the message size before the message
1891/// if the record batch is being written to a stream
1892fn write_continuation<W: Write>(
1893    mut writer: W,
1894    write_options: &IpcWriteOptions,
1895    total_len: i32,
1896) -> Result<usize, ArrowError> {
1897    let mut written = 8;
1898
1899    // the version of the writer determines whether continuation markers should be added
1900    match write_options.metadata_version {
1901        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1902            unreachable!("Options with the metadata version cannot be created")
1903        }
1904        crate::MetadataVersion::V4 => {
1905            if !write_options.write_legacy_ipc_format {
1906                // v0.15.0 format
1907                writer.write_all(&CONTINUATION_MARKER)?;
1908                written = 4;
1909            }
1910            writer.write_all(&total_len.to_le_bytes()[..])?;
1911        }
1912        crate::MetadataVersion::V5 => {
1913            // write continuation marker and message length
1914            writer.write_all(&CONTINUATION_MARKER)?;
1915            writer.write_all(&total_len.to_le_bytes()[..])?;
1916        }
1917        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1918    };
1919
1920    Ok(written)
1921}
1922
1923/// In V4, null types have no validity bitmap
1924/// In V5 and later, null and union types have no validity bitmap
1925/// Run end encoded type has no validity bitmap.
1926fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1927    if write_options.metadata_version < crate::MetadataVersion::V5 {
1928        !matches!(data_type, DataType::Null)
1929    } else {
1930        !matches!(
1931            data_type,
1932            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1933        )
1934    }
1935}
1936
1937/// Whether to truncate the buffer
1938#[inline]
1939fn buffer_need_truncate(
1940    array_offset: usize,
1941    buffer: &Buffer,
1942    spec: &BufferSpec,
1943    min_length: usize,
1944) -> bool {
1945    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1946}
1947
1948/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1949#[inline]
1950fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1951    match spec {
1952        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1953        _ => 0,
1954    }
1955}
1956
1957/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1958/// original start offset and length for use in slicing child data.
1959fn reencode_offsets<O: OffsetSizeTrait>(
1960    offsets: &Buffer,
1961    data: &ArrayData,
1962) -> (Buffer, usize, usize) {
1963    let offsets_slice: &[O] = offsets.typed_data::<O>();
1964    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1965
1966    let start_offset = offset_slice.first().unwrap();
1967    let end_offset = offset_slice.last().unwrap();
1968
1969    let offsets = match start_offset.as_usize() {
1970        0 => {
1971            let size = size_of::<O>();
1972            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1973        }
1974        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1975    };
1976
1977    let start_offset = start_offset.as_usize();
1978    let end_offset = end_offset.as_usize();
1979
1980    (offsets, start_offset, end_offset - start_offset)
1981}
1982
1983/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1984///
1985/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1986/// slicing the values buffer as appropriate. This helps reduce the encoded
1987/// size of sliced arrays, as values that have been sliced away are not encoded
1988fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1989    if data.is_empty() {
1990        // As per specification, offsets buffer has N+1 elements.
1991        // So an empty array should still be encoded with a single 0 offset.
1992        let mut offsets = MutableBuffer::new(size_of::<O>());
1993        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1994        return (offsets.into(), MutableBuffer::new(0).into());
1995    }
1996
1997    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1998    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1999    (offsets, values)
2000}
2001
2002/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
2003/// of a values buffer.
2004fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
2005    if data.is_empty() {
2006        // As per specification, offsets buffer has N+1 elements.
2007        // So an empty array should still be encoded with a single 0 offset.
2008        let mut offsets = MutableBuffer::new(size_of::<O>());
2009        offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
2010        return (offsets.into(), data.child_data()[0].slice(0, 0));
2011    }
2012
2013    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
2014    let child_data = data.child_data()[0].slice(original_start_offset, len);
2015    (offsets, child_data)
2016}
2017
2018/// Returns the offsets, sizes, and child data buffers for a ListView array.
2019///
2020/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
2021/// and offsets can be non-monotonic. When slicing, we simply pass through the
2022/// offsets and sizes without re-encoding, and do not slice the child data.
2023fn get_list_view_array_buffers<O: OffsetSizeTrait>(
2024    data: &ArrayData,
2025) -> (Buffer, Buffer, ArrayData) {
2026    if data.is_empty() {
2027        return (
2028            MutableBuffer::new(0).into(),
2029            MutableBuffer::new(0).into(),
2030            data.child_data()[0].slice(0, 0),
2031        );
2032    }
2033
2034    let offsets = &data.buffers()[0];
2035    let sizes = &data.buffers()[1];
2036
2037    let element_size = std::mem::size_of::<O>();
2038    let offsets_slice =
2039        offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
2040    let sizes_slice =
2041        sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
2042
2043    let child_data = data.child_data()[0].clone();
2044
2045    (offsets_slice, sizes_slice, child_data)
2046}
2047
2048/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
2049///
2050/// The views buffer is sliced to only include views in the valid range based on
2051/// the array's offset and length. This helps reduce the encoded size of sliced
2052/// arrays
2053///
2054fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
2055    let buffer = &array_data.buffers()[0];
2056    let layout = layout(array_data.data_type());
2057    let spec = &layout.buffers[0];
2058
2059    let byte_width = get_buffer_element_width(spec);
2060    let min_length = array_data.len() * byte_width;
2061    if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
2062        let byte_offset = array_data.offset() * byte_width;
2063        let buffer_length = min(min_length, buffer.len() - byte_offset);
2064        buffer.slice_with_length(byte_offset, buffer_length)
2065    } else {
2066        buffer.clone()
2067    }
2068}
2069
2070/// Recursively encodes `array_data` into its IPC representation.
2071///
2072/// Output goes to two separate channels:
2073/// - `meta`: accumulates IPC metadata (`nodes` and `buffers`) for the flatbuffer header.
2074/// - `sink`: the raw Arrow data bytes that form the IPC message body.
2075fn write_array_data(
2076    array_data: &ArrayData,
2077    meta: &mut IpcMetadataBuilder,
2078    sink: &mut IpcBodySink<'_>,
2079    offset: i64,
2080    compression_codec: Option<CompressionCodec>,
2081    ipc_write_context: &mut IpcWriteContext,
2082    write_options: &IpcWriteOptions,
2083) -> Result<i64, ArrowError> {
2084    let mut offset = offset;
2085    let num_rows = array_data.len();
2086    if !matches!(array_data.data_type(), DataType::Null) {
2087        meta.nodes.push(crate::FieldNode::new(
2088            num_rows as i64,
2089            array_data.null_count() as i64,
2090        ));
2091    } else {
2092        // NullArray's null_count equals to len, but ArrayData null_count is always 0.
2093        meta.nodes
2094            .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
2095    }
2096    if has_validity_bitmap(array_data.data_type(), write_options) {
2097        // write null buffer if exists
2098        let null_buffer = match array_data.nulls() {
2099            None => {
2100                // create a buffer and fill it with valid bits
2101                let num_bytes = bit_util::ceil(num_rows, 8);
2102                let buffer = MutableBuffer::new(num_bytes);
2103                let buffer = buffer.with_bitset(num_bytes, true);
2104                buffer.into()
2105            }
2106            Some(buffer) => buffer.inner().sliced(),
2107        };
2108
2109        offset = encode_sink_buffer(
2110            null_buffer,
2111            meta,
2112            sink,
2113            offset,
2114            compression_codec,
2115            ipc_write_context,
2116            write_options.alignment,
2117        )?;
2118    }
2119
2120    let data_type = array_data.data_type();
2121    if matches!(data_type, DataType::Binary | DataType::Utf8) {
2122        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2123        for buffer in [offsets, values] {
2124            offset = encode_sink_buffer(
2125                buffer,
2126                meta,
2127                sink,
2128                offset,
2129                compression_codec,
2130                ipc_write_context,
2131                write_options.alignment,
2132            )?;
2133        }
2134    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2135        // Slicing the views buffer is safe and easy,
2136        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
2137        //
2138        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
2139        // If users wants to "compact" the arrays prior to sending them over IPC,
2140        // they should consider the gc API suggested in #5513
2141        let views = get_or_truncate_buffer(array_data);
2142        offset = encode_sink_buffer(
2143            views,
2144            meta,
2145            sink,
2146            offset,
2147            compression_codec,
2148            ipc_write_context,
2149            write_options.alignment,
2150        )?;
2151
2152        for buffer in array_data.buffers().iter().skip(1) {
2153            offset = encode_sink_buffer(
2154                buffer.clone(),
2155                meta,
2156                sink,
2157                offset,
2158                compression_codec,
2159                ipc_write_context,
2160                write_options.alignment,
2161            )?;
2162        }
2163    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2164        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2165        for buffer in [offsets, values] {
2166            offset = encode_sink_buffer(
2167                buffer,
2168                meta,
2169                sink,
2170                offset,
2171                compression_codec,
2172                ipc_write_context,
2173                write_options.alignment,
2174            )?;
2175        }
2176    } else if DataType::is_numeric(data_type)
2177        || DataType::is_temporal(data_type)
2178        || matches!(
2179            array_data.data_type(),
2180            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2181        )
2182    {
2183        // Truncate values
2184        assert_eq!(array_data.buffers().len(), 1);
2185
2186        let buffer = get_or_truncate_buffer(array_data);
2187        offset = encode_sink_buffer(
2188            buffer,
2189            meta,
2190            sink,
2191            offset,
2192            compression_codec,
2193            ipc_write_context,
2194            write_options.alignment,
2195        )?;
2196    } else if matches!(data_type, DataType::Boolean) {
2197        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
2198        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
2199        assert_eq!(array_data.buffers().len(), 1);
2200
2201        let buffer = &array_data.buffers()[0];
2202        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2203        offset = encode_sink_buffer(
2204            buffer,
2205            meta,
2206            sink,
2207            offset,
2208            compression_codec,
2209            ipc_write_context,
2210            write_options.alignment,
2211        )?;
2212    } else if matches!(
2213        data_type,
2214        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2215    ) {
2216        assert_eq!(array_data.buffers().len(), 1);
2217        assert_eq!(array_data.child_data().len(), 1);
2218
2219        // Truncate offsets and the child data to avoid writing unnecessary data
2220        let (offsets, sliced_child_data) = match data_type {
2221            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2222            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2223            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2224            _ => unreachable!(),
2225        };
2226        offset = encode_sink_buffer(
2227            offsets,
2228            meta,
2229            sink,
2230            offset,
2231            compression_codec,
2232            ipc_write_context,
2233            write_options.alignment,
2234        )?;
2235        offset = write_array_data(
2236            &sliced_child_data,
2237            meta,
2238            sink,
2239            offset,
2240            compression_codec,
2241            ipc_write_context,
2242            write_options,
2243        )?;
2244        return Ok(offset);
2245    } else if matches!(
2246        data_type,
2247        DataType::ListView(_) | DataType::LargeListView(_)
2248    ) {
2249        assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
2250        assert_eq!(array_data.child_data().len(), 1);
2251
2252        let (offsets, sizes, child_data) = match data_type {
2253            DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2254            DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2255            _ => unreachable!(),
2256        };
2257
2258        offset = encode_sink_buffer(
2259            offsets,
2260            meta,
2261            sink,
2262            offset,
2263            compression_codec,
2264            ipc_write_context,
2265            write_options.alignment,
2266        )?;
2267        offset = encode_sink_buffer(
2268            sizes,
2269            meta,
2270            sink,
2271            offset,
2272            compression_codec,
2273            ipc_write_context,
2274            write_options.alignment,
2275        )?;
2276
2277        offset = write_array_data(
2278            &child_data,
2279            meta,
2280            sink,
2281            offset,
2282            compression_codec,
2283            ipc_write_context,
2284            write_options,
2285        )?;
2286        return Ok(offset);
2287    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2288        assert_eq!(array_data.child_data().len(), 1);
2289        let fixed_size = *fixed_size as usize;
2290
2291        let child_offset = array_data.offset() * fixed_size;
2292        let child_length = array_data.len() * fixed_size;
2293        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2294
2295        offset = write_array_data(
2296            &child_data,
2297            meta,
2298            sink,
2299            offset,
2300            compression_codec,
2301            ipc_write_context,
2302            write_options,
2303        )?;
2304        return Ok(offset);
2305    } else {
2306        for buffer in array_data.buffers() {
2307            offset = encode_sink_buffer(
2308                buffer.clone(),
2309                meta,
2310                sink,
2311                offset,
2312                compression_codec,
2313                ipc_write_context,
2314                write_options.alignment,
2315            )?;
2316        }
2317    }
2318
2319    match array_data.data_type() {
2320        DataType::Dictionary(_, _) => {}
2321        DataType::RunEndEncoded(_, _) => {
2322            // unslice the run encoded array.
2323            let arr = unslice_run_array(array_data.clone())?;
2324            // recursively write out nested structures
2325            for data_ref in arr.child_data() {
2326                // write the nested data (e.g list data)
2327                offset = write_array_data(
2328                    data_ref,
2329                    meta,
2330                    sink,
2331                    offset,
2332                    compression_codec,
2333                    ipc_write_context,
2334                    write_options,
2335                )?;
2336            }
2337        }
2338        _ => {
2339            // recursively write out nested structures
2340            for data_ref in array_data.child_data() {
2341                // write the nested data (e.g list data)
2342                offset = write_array_data(
2343                    data_ref,
2344                    meta,
2345                    sink,
2346                    offset,
2347                    compression_codec,
2348                    ipc_write_context,
2349                    write_options,
2350                )?;
2351            }
2352        }
2353    }
2354    Ok(offset)
2355}
2356
2357/// Encodes a single Arrow [`Buffer`] into the IPC body and records its metadata.
2358///
2359/// - `buffer`: the Arrow data buffer to encode (validity bitmap, offsets, values, etc.)
2360/// - `buffers`: in-progress list of IPC `Buffer` metadata entries (body offset + length) that
2361///   will eventually be serialised into the flatbuffer `RecordBatch` header.
2362/// - `sink`: destination for the actual encoded bytes; either a contiguous `Vec<u8>` for
2363///   in-memory writes, or a list of [`EncodedBuffer`] segments for deferred zero-copy streaming.
2364/// - `offset`: running byte offset into the IPC message body, used to compute the metadata entry.
2365/// - `compression_codec` / `ipc_write_context`: if `Some`, the buffer is compressed before
2366///   writing; `ipc_write_context` provides reusable scratch space across calls.
2367/// - `alignment`: each buffer is padded to this many bytes so the next buffer starts aligned.
2368///
2369/// Returns the updated `offset` (advanced by the encoded length plus any alignment padding).
2370fn encode_sink_buffer(
2371    buffer: Buffer,
2372    ipc_meta_data: &mut IpcMetadataBuilder,
2373    sink: &mut IpcBodySink<'_>,
2374    offset: i64,
2375    compression_codec: Option<CompressionCodec>,
2376    ipc_write_context: &mut IpcWriteContext,
2377    alignment: u8,
2378) -> Result<i64, ArrowError> {
2379    let (encoded, len) = match compression_codec {
2380        None => {
2381            let len = buffer.len() as i64;
2382            (EncodedBuffer::Raw(buffer), len)
2383        }
2384        Some(codec) => {
2385            let mut scratch = Vec::new();
2386            let written =
2387                codec.compress_to_vec(buffer.as_slice(), &mut scratch, ipc_write_context)?;
2388            let len = i64::try_from(written)
2389                .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2390            (EncodedBuffer::Compressed(scratch), len)
2391        }
2392    };
2393
2394    let pad_len = pad_to_alignment(alignment, len as usize);
2395    sink.write(pad_len, encoded);
2396    ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2397    Ok(offset + len + pad_len as i64)
2398}
2399
2400const PADDING: [u8; 64] = [0; 64];
2401
2402/// Estimates the number of [`EncodedBuffer`] segments that [`write_array_data`]
2403/// will produce for a column of the given type.
2404///
2405/// Based on the Arrow IPC buffer layout
2406/// (<https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message>):
2407#[inline]
2408fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2409    match dt {
2410        DataType::Null => 0,
2411
2412        DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2413
2414        DataType::BinaryView | DataType::Utf8View => 3,
2415
2416        DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2417            2 + estimate_encoded_buffer_count(f.data_type())
2418        }
2419
2420        DataType::ListView(f) | DataType::LargeListView(f) => {
2421            3 + estimate_encoded_buffer_count(f.data_type())
2422        }
2423
2424        DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2425
2426        DataType::Struct(fields) => {
2427            1 + fields
2428                .iter()
2429                .map(|f| estimate_encoded_buffer_count(f.data_type()))
2430                .sum::<usize>()
2431        }
2432
2433        // Dictionary indices only; dictionary body is a separate IPC message.
2434        DataType::Dictionary(_, _) => 2,
2435
2436        DataType::Union(fields, UnionMode::Sparse) => {
2437            1 + fields
2438                .iter()
2439                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2440                .sum::<usize>()
2441        }
2442        DataType::Union(fields, UnionMode::Dense) => {
2443            2 + fields
2444                .iter()
2445                .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2446                .sum::<usize>()
2447        }
2448
2449        DataType::RunEndEncoded(run_ends, values) => {
2450            estimate_encoded_buffer_count(run_ends.data_type())
2451                + estimate_encoded_buffer_count(values.data_type())
2452        }
2453        // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity + values.
2454        _ => 2,
2455    }
2456}
2457
2458/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2459#[inline]
2460fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2461    let a = usize::from(alignment - 1);
2462    ((len + a) & !a) - len
2463}
2464
2465#[cfg(test)]
2466mod tests {
2467    use std::hash::Hasher;
2468    use std::io::Cursor;
2469    use std::io::Seek;
2470
2471    use arrow_array::builder::FixedSizeListBuilder;
2472    use arrow_array::builder::Float32Builder;
2473    use arrow_array::builder::Int64Builder;
2474    use arrow_array::builder::MapBuilder;
2475    use arrow_array::builder::StringViewBuilder;
2476    use arrow_array::builder::UnionBuilder;
2477    use arrow_array::builder::{
2478        GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2479    };
2480    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2481    use arrow_array::types::*;
2482    use arrow_buffer::ScalarBuffer;
2483
2484    use crate::MetadataVersion;
2485    use crate::convert::fb_to_schema;
2486    use crate::reader::*;
2487    use crate::root_as_footer;
2488
2489    use super::*;
2490
2491    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2492        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2493        writer.write(rb).unwrap();
2494        writer.finish().unwrap();
2495        writer.into_inner().unwrap()
2496    }
2497
2498    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2499        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2500        reader.next().unwrap().unwrap()
2501    }
2502
2503    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2504        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2505        // without needing to construct a giant array to spill over the 64-byte default alignment
2506        // boundary.
2507        const IPC_ALIGNMENT: usize = 8;
2508
2509        let mut stream_writer = StreamWriter::try_new_with_options(
2510            vec![],
2511            record.schema_ref(),
2512            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2513        )
2514        .unwrap();
2515        stream_writer.write(record).unwrap();
2516        stream_writer.finish().unwrap();
2517        stream_writer.into_inner().unwrap()
2518    }
2519
2520    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2521        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2522        stream_reader.next().unwrap().unwrap()
2523    }
2524
2525    #[test]
2526    #[cfg(feature = "lz4")]
2527    fn test_write_empty_record_batch_lz4_compression() {
2528        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2529        let values: Vec<Option<i32>> = vec![];
2530        let array = Int32Array::from(values);
2531        let record_batch =
2532            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2533
2534        let mut file = tempfile::tempfile().unwrap();
2535
2536        {
2537            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2538                .unwrap()
2539                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2540                .unwrap();
2541
2542            let mut writer =
2543                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2544            writer.write(&record_batch).unwrap();
2545            writer.finish().unwrap();
2546        }
2547        file.rewind().unwrap();
2548        {
2549            // read file
2550            let reader = FileReader::try_new(file, None).unwrap();
2551            for read_batch in reader {
2552                read_batch
2553                    .unwrap()
2554                    .columns()
2555                    .iter()
2556                    .zip(record_batch.columns())
2557                    .for_each(|(a, b)| {
2558                        assert_eq!(a.data_type(), b.data_type());
2559                        assert_eq!(a.len(), b.len());
2560                        assert_eq!(a.null_count(), b.null_count());
2561                    });
2562            }
2563        }
2564    }
2565
2566    #[test]
2567    #[cfg(feature = "lz4")]
2568    fn test_write_file_with_lz4_compression() {
2569        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2570        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2571        let array = Int32Array::from(values);
2572        let record_batch =
2573            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2574
2575        let mut file = tempfile::tempfile().unwrap();
2576        {
2577            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2578                .unwrap()
2579                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2580                .unwrap();
2581
2582            let mut writer =
2583                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2584            writer.write(&record_batch).unwrap();
2585            writer.finish().unwrap();
2586        }
2587        file.rewind().unwrap();
2588        {
2589            // read file
2590            let reader = FileReader::try_new(file, None).unwrap();
2591            for read_batch in reader {
2592                read_batch
2593                    .unwrap()
2594                    .columns()
2595                    .iter()
2596                    .zip(record_batch.columns())
2597                    .for_each(|(a, b)| {
2598                        assert_eq!(a.data_type(), b.data_type());
2599                        assert_eq!(a.len(), b.len());
2600                        assert_eq!(a.null_count(), b.null_count());
2601                    });
2602            }
2603        }
2604    }
2605
2606    #[test]
2607    #[cfg(feature = "zstd")]
2608    fn test_write_file_with_zstd_compression() {
2609        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2610        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2611        let array = Int32Array::from(values);
2612        let record_batch =
2613            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2614        let mut file = tempfile::tempfile().unwrap();
2615        {
2616            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2617                .unwrap()
2618                .try_with_compression(Some(crate::CompressionType::ZSTD))
2619                .unwrap()
2620                .try_with_compression_level(Some(1))
2621                .unwrap();
2622
2623            let mut writer =
2624                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2625            writer.write(&record_batch).unwrap();
2626            writer.finish().unwrap();
2627        }
2628        file.rewind().unwrap();
2629        {
2630            // read file
2631            let reader = FileReader::try_new(file, None).unwrap();
2632            for read_batch in reader {
2633                read_batch
2634                    .unwrap()
2635                    .columns()
2636                    .iter()
2637                    .zip(record_batch.columns())
2638                    .for_each(|(a, b)| {
2639                        assert_eq!(a.data_type(), b.data_type());
2640                        assert_eq!(a.len(), b.len());
2641                        assert_eq!(a.null_count(), b.null_count());
2642                    });
2643            }
2644        }
2645    }
2646
2647    #[test]
2648    fn test_write_file() {
2649        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2650        let values: Vec<Option<u32>> = vec![
2651            Some(999),
2652            None,
2653            Some(235),
2654            Some(123),
2655            None,
2656            None,
2657            None,
2658            None,
2659            None,
2660        ];
2661        let array1 = UInt32Array::from(values);
2662        let batch =
2663            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2664                .unwrap();
2665        let mut file = tempfile::tempfile().unwrap();
2666        {
2667            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2668
2669            writer.write(&batch).unwrap();
2670            writer.finish().unwrap();
2671        }
2672        file.rewind().unwrap();
2673
2674        {
2675            let mut reader = FileReader::try_new(file, None).unwrap();
2676            while let Some(Ok(read_batch)) = reader.next() {
2677                read_batch
2678                    .columns()
2679                    .iter()
2680                    .zip(batch.columns())
2681                    .for_each(|(a, b)| {
2682                        assert_eq!(a.data_type(), b.data_type());
2683                        assert_eq!(a.len(), b.len());
2684                        assert_eq!(a.null_count(), b.null_count());
2685                    });
2686            }
2687        }
2688    }
2689
2690    #[test]
2691    fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2692        let name = StringArray::from(Vec::<String>::new());
2693        let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2694
2695        assert_eq!(name.len(), 0);
2696        assert_eq!(
2697            offsets.len(),
2698            std::mem::size_of::<i32>(),
2699            "offsets buffer should contain one zero i32 offset"
2700        );
2701        assert_eq!(values.len(), 0, "values buffer should remain empty");
2702    }
2703
2704    #[test]
2705    fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2706        let name = LargeStringArray::from(Vec::<String>::new());
2707        let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2708
2709        assert_eq!(name.len(), 0);
2710        assert_eq!(
2711            offsets.len(),
2712            std::mem::size_of::<i64>(),
2713            "offsets buffer should contain one zero i64 offset"
2714        );
2715        assert_eq!(values.len(), 0, "values buffer should remain empty");
2716    }
2717
2718    #[test]
2719    fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2720        let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2721        let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2722
2723        assert_eq!(list.len(), 0);
2724        assert_eq!(
2725            offsets.len(),
2726            std::mem::size_of::<i32>(),
2727            "offsets buffer should contain one zero i32 offset"
2728        );
2729        assert_eq!(child_data.len(), 0, "child data should remain empty");
2730    }
2731
2732    #[test]
2733    fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2734        let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2735        let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2736
2737        assert_eq!(list.len(), 0);
2738        assert_eq!(
2739            offsets.len(),
2740            std::mem::size_of::<i64>(),
2741            "offsets buffer should contain one zero i64 offset"
2742        );
2743        assert_eq!(child_data.len(), 0, "child data should remain empty");
2744    }
2745
2746    fn write_null_file(options: IpcWriteOptions) {
2747        let schema = Schema::new(vec![
2748            Field::new("nulls", DataType::Null, true),
2749            Field::new("int32s", DataType::Int32, false),
2750            Field::new("nulls2", DataType::Null, true),
2751            Field::new("f64s", DataType::Float64, false),
2752        ]);
2753        let array1 = NullArray::new(32);
2754        let array2 = Int32Array::from(vec![1; 32]);
2755        let array3 = NullArray::new(32);
2756        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2757        let batch = RecordBatch::try_new(
2758            Arc::new(schema.clone()),
2759            vec![
2760                Arc::new(array1) as ArrayRef,
2761                Arc::new(array2) as ArrayRef,
2762                Arc::new(array3) as ArrayRef,
2763                Arc::new(array4) as ArrayRef,
2764            ],
2765        )
2766        .unwrap();
2767        let mut file = tempfile::tempfile().unwrap();
2768        {
2769            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2770
2771            writer.write(&batch).unwrap();
2772            writer.finish().unwrap();
2773        }
2774
2775        file.rewind().unwrap();
2776
2777        {
2778            let reader = FileReader::try_new(file, None).unwrap();
2779            reader.for_each(|maybe_batch| {
2780                maybe_batch
2781                    .unwrap()
2782                    .columns()
2783                    .iter()
2784                    .zip(batch.columns())
2785                    .for_each(|(a, b)| {
2786                        assert_eq!(a.data_type(), b.data_type());
2787                        assert_eq!(a.len(), b.len());
2788                        assert_eq!(a.null_count(), b.null_count());
2789                    });
2790            });
2791        }
2792    }
2793    #[test]
2794    fn test_write_null_file_v4() {
2795        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2796        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2797        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2798        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2799    }
2800
2801    #[test]
2802    fn test_write_null_file_v5() {
2803        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2804        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2805    }
2806
2807    #[test]
2808    fn track_union_nested_dict() {
2809        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2810
2811        let array = Arc::new(inner) as ArrayRef;
2812
2813        // Dict field with id 2
2814        #[allow(deprecated)]
2815        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2816        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2817
2818        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2819        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2820
2821        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2822
2823        let schema = Arc::new(Schema::new(vec![Field::new(
2824            "union",
2825            union.data_type().clone(),
2826            false,
2827        )]));
2828
2829        let r#gen = IpcDataGenerator::default();
2830        let mut dict_tracker = DictionaryTracker::new(false);
2831        r#gen.schema_to_bytes_with_dictionary_tracker(
2832            &schema,
2833            &mut dict_tracker,
2834            &IpcWriteOptions::default(),
2835        );
2836
2837        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2838
2839        r#gen
2840            .encode(
2841                &batch,
2842                &mut dict_tracker,
2843                &Default::default(),
2844                &mut Default::default(),
2845            )
2846            .unwrap();
2847
2848        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2849        // so we expect the dict will be keyed to 0
2850        assert!(dict_tracker.written.contains_key(&0));
2851    }
2852
2853    #[test]
2854    fn track_struct_nested_dict() {
2855        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2856
2857        let array = Arc::new(inner) as ArrayRef;
2858
2859        // Dict field with id 2
2860        #[allow(deprecated)]
2861        let dctfield = Arc::new(Field::new_dict(
2862            "dict",
2863            array.data_type().clone(),
2864            false,
2865            2,
2866            false,
2867        ));
2868
2869        let s = StructArray::from(vec![(dctfield, array)]);
2870        let struct_array = Arc::new(s) as ArrayRef;
2871
2872        let schema = Arc::new(Schema::new(vec![Field::new(
2873            "struct",
2874            struct_array.data_type().clone(),
2875            false,
2876        )]));
2877
2878        let r#gen = IpcDataGenerator::default();
2879        let mut dict_tracker = DictionaryTracker::new(false);
2880        r#gen.schema_to_bytes_with_dictionary_tracker(
2881            &schema,
2882            &mut dict_tracker,
2883            &IpcWriteOptions::default(),
2884        );
2885
2886        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2887
2888        r#gen
2889            .encode(
2890                &batch,
2891                &mut dict_tracker,
2892                &Default::default(),
2893                &mut Default::default(),
2894            )
2895            .unwrap();
2896
2897        assert!(dict_tracker.written.contains_key(&0));
2898    }
2899
2900    fn write_union_file(options: IpcWriteOptions) {
2901        let schema = Schema::new(vec![Field::new_union(
2902            "union",
2903            vec![0, 1],
2904            vec![
2905                Field::new("a", DataType::Int32, false),
2906                Field::new("c", DataType::Float64, false),
2907            ],
2908            UnionMode::Sparse,
2909        )]);
2910        let mut builder = UnionBuilder::with_capacity_sparse(5);
2911        builder.append::<Int32Type>("a", 1).unwrap();
2912        builder.append_null::<Int32Type>("a").unwrap();
2913        builder.append::<Float64Type>("c", 3.0).unwrap();
2914        builder.append_null::<Float64Type>("c").unwrap();
2915        builder.append::<Int32Type>("a", 4).unwrap();
2916        let union = builder.build().unwrap();
2917
2918        let batch =
2919            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2920                .unwrap();
2921
2922        let mut file = tempfile::tempfile().unwrap();
2923        {
2924            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2925
2926            writer.write(&batch).unwrap();
2927            writer.finish().unwrap();
2928        }
2929        file.rewind().unwrap();
2930
2931        {
2932            let reader = FileReader::try_new(file, None).unwrap();
2933            reader.for_each(|maybe_batch| {
2934                maybe_batch
2935                    .unwrap()
2936                    .columns()
2937                    .iter()
2938                    .zip(batch.columns())
2939                    .for_each(|(a, b)| {
2940                        assert_eq!(a.data_type(), b.data_type());
2941                        assert_eq!(a.len(), b.len());
2942                        assert_eq!(a.null_count(), b.null_count());
2943                    });
2944            });
2945        }
2946    }
2947
2948    #[test]
2949    fn test_write_union_file_v4_v5() {
2950        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2951        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2952    }
2953
2954    #[test]
2955    fn test_write_view_types() {
2956        const LONG_TEST_STRING: &str =
2957            "This is a long string to make sure binary view array handles it";
2958        let schema = Schema::new(vec![
2959            Field::new("field1", DataType::BinaryView, true),
2960            Field::new("field2", DataType::Utf8View, true),
2961        ]);
2962        let values: Vec<Option<&[u8]>> = vec![
2963            Some(b"foo"),
2964            Some(b"bar"),
2965            Some(LONG_TEST_STRING.as_bytes()),
2966        ];
2967        let binary_array = BinaryViewArray::from_iter(values);
2968        let utf8_array =
2969            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2970        let record_batch = RecordBatch::try_new(
2971            Arc::new(schema.clone()),
2972            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2973        )
2974        .unwrap();
2975
2976        let mut file = tempfile::tempfile().unwrap();
2977        {
2978            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2979            writer.write(&record_batch).unwrap();
2980            writer.finish().unwrap();
2981        }
2982        file.rewind().unwrap();
2983        {
2984            let mut reader = FileReader::try_new(&file, None).unwrap();
2985            let read_batch = reader.next().unwrap().unwrap();
2986            read_batch
2987                .columns()
2988                .iter()
2989                .zip(record_batch.columns())
2990                .for_each(|(a, b)| {
2991                    assert_eq!(a, b);
2992                });
2993        }
2994        file.rewind().unwrap();
2995        {
2996            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2997            let read_batch = reader.next().unwrap().unwrap();
2998            assert_eq!(read_batch.num_columns(), 1);
2999            let read_array = read_batch.column(0);
3000            let write_array = record_batch.column(0);
3001            assert_eq!(read_array, write_array);
3002        }
3003    }
3004
3005    #[test]
3006    fn truncate_ipc_record_batch() {
3007        fn create_batch(rows: usize) -> RecordBatch {
3008            let schema = Schema::new(vec![
3009                Field::new("a", DataType::Int32, false),
3010                Field::new("b", DataType::Utf8, false),
3011            ]);
3012
3013            let a = Int32Array::from_iter_values(0..rows as i32);
3014            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
3015
3016            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3017        }
3018
3019        let big_record_batch = create_batch(65536);
3020
3021        let length = 5;
3022        let small_record_batch = create_batch(length);
3023
3024        let offset = 2;
3025        let record_batch_slice = big_record_batch.slice(offset, length);
3026        assert!(
3027            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
3028        );
3029        assert_eq!(
3030            serialize_stream(&small_record_batch).len(),
3031            serialize_stream(&record_batch_slice).len()
3032        );
3033
3034        assert_eq!(
3035            deserialize_stream(serialize_stream(&record_batch_slice)),
3036            record_batch_slice
3037        );
3038    }
3039
3040    #[test]
3041    fn truncate_ipc_record_batch_with_nulls() {
3042        fn create_batch() -> RecordBatch {
3043            let schema = Schema::new(vec![
3044                Field::new("a", DataType::Int32, true),
3045                Field::new("b", DataType::Utf8, true),
3046            ]);
3047
3048            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
3049            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
3050
3051            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3052        }
3053
3054        let record_batch = create_batch();
3055        let record_batch_slice = record_batch.slice(1, 2);
3056        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3057
3058        assert!(
3059            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3060        );
3061
3062        assert!(deserialized_batch.column(0).is_null(0));
3063        assert!(deserialized_batch.column(0).is_valid(1));
3064        assert!(deserialized_batch.column(1).is_valid(0));
3065        assert!(deserialized_batch.column(1).is_valid(1));
3066
3067        assert_eq!(record_batch_slice, deserialized_batch);
3068    }
3069
3070    #[test]
3071    fn truncate_ipc_dictionary_array() {
3072        fn create_batch() -> RecordBatch {
3073            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
3074                .into_iter()
3075                .collect();
3076            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3077
3078            let array = DictionaryArray::new(keys, Arc::new(values));
3079
3080            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
3081
3082            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
3083        }
3084
3085        let record_batch = create_batch();
3086        let record_batch_slice = record_batch.slice(1, 2);
3087        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3088
3089        assert!(
3090            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3091        );
3092
3093        assert!(deserialized_batch.column(0).is_valid(0));
3094        assert!(deserialized_batch.column(0).is_null(1));
3095
3096        assert_eq!(record_batch_slice, deserialized_batch);
3097    }
3098
3099    #[test]
3100    fn truncate_ipc_struct_array() {
3101        fn create_batch() -> RecordBatch {
3102            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
3103                .into_iter()
3104                .collect();
3105            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3106
3107            let struct_array = StructArray::from(vec![
3108                (
3109                    Arc::new(Field::new("s", DataType::Utf8, true)),
3110                    Arc::new(strings) as ArrayRef,
3111                ),
3112                (
3113                    Arc::new(Field::new("c", DataType::Int32, true)),
3114                    Arc::new(ints) as ArrayRef,
3115                ),
3116            ]);
3117
3118            let schema = Schema::new(vec![Field::new(
3119                "struct_array",
3120                struct_array.data_type().clone(),
3121                true,
3122            )]);
3123
3124            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3125        }
3126
3127        let record_batch = create_batch();
3128        let record_batch_slice = record_batch.slice(1, 2);
3129        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3130
3131        assert!(
3132            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3133        );
3134
3135        let structs = deserialized_batch
3136            .column(0)
3137            .as_any()
3138            .downcast_ref::<StructArray>()
3139            .unwrap();
3140
3141        assert!(structs.column(0).is_null(0));
3142        assert!(structs.column(0).is_valid(1));
3143        assert!(structs.column(1).is_valid(0));
3144        assert!(structs.column(1).is_null(1));
3145        assert_eq!(record_batch_slice, deserialized_batch);
3146    }
3147
3148    #[test]
3149    fn truncate_ipc_string_array_with_all_empty_string() {
3150        fn create_batch() -> RecordBatch {
3151            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3152            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3153            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3154        }
3155
3156        let record_batch = create_batch();
3157        let record_batch_slice = record_batch.slice(0, 1);
3158        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3159
3160        assert!(
3161            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3162        );
3163        assert_eq!(record_batch_slice, deserialized_batch);
3164    }
3165
3166    #[test]
3167    fn test_stream_writer_writes_array_slice() {
3168        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3169        assert_eq!(
3170            vec![Some(1), Some(2), Some(3)],
3171            array.iter().collect::<Vec<_>>()
3172        );
3173
3174        let sliced = array.slice(1, 2);
3175        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3176
3177        let batch = RecordBatch::try_new(
3178            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3179            vec![Arc::new(sliced)],
3180        )
3181        .expect("new batch");
3182
3183        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3184        writer.write(&batch).expect("write");
3185        let outbuf = writer.into_inner().expect("inner");
3186
3187        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3188        let read_batch = reader.next().unwrap().expect("read batch");
3189
3190        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3191        assert_eq!(
3192            vec![Some(2), Some(3)],
3193            read_array.iter().collect::<Vec<_>>()
3194        );
3195    }
3196
3197    #[test]
3198    fn test_large_slice_uint32() {
3199        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3200            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3201        )));
3202    }
3203
3204    #[test]
3205    fn test_large_slice_string() {
3206        let strings: Vec<_> = (0..8000)
3207            .map(|i| {
3208                if i % 2 == 0 {
3209                    Some(format!("value{i}"))
3210                } else {
3211                    None
3212                }
3213            })
3214            .collect();
3215
3216        ensure_roundtrip(Arc::new(StringArray::from(strings)));
3217    }
3218
3219    #[test]
3220    fn test_large_slice_string_list() {
3221        let mut ls = ListBuilder::new(StringBuilder::new());
3222
3223        let mut s = String::new();
3224        for row_number in 0..8000 {
3225            if row_number % 2 == 0 {
3226                for list_element in 0..1000 {
3227                    s.clear();
3228                    use std::fmt::Write;
3229                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3230                    ls.values().append_value(&s);
3231                }
3232                ls.append(true)
3233            } else {
3234                ls.append(false); // null
3235            }
3236        }
3237
3238        ensure_roundtrip(Arc::new(ls.finish()));
3239    }
3240
3241    #[test]
3242    fn test_large_slice_string_list_of_lists() {
3243        // The reason for the special test is to verify reencode_offsets which looks both at
3244        // the starting offset and the data offset.  So need a dataset where the starting_offset
3245        // is zero but the data offset is not.
3246        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3247
3248        for _ in 0..4000 {
3249            ls.values().append(true);
3250            ls.append(true)
3251        }
3252
3253        let mut s = String::new();
3254        for row_number in 0..4000 {
3255            if row_number % 2 == 0 {
3256                for list_element in 0..1000 {
3257                    s.clear();
3258                    use std::fmt::Write;
3259                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
3260                    ls.values().values().append_value(&s);
3261                }
3262                ls.values().append(true);
3263                ls.append(true)
3264            } else {
3265                ls.append(false); // null
3266            }
3267        }
3268
3269        ensure_roundtrip(Arc::new(ls.finish()));
3270    }
3271
3272    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
3273    fn ensure_roundtrip(array: ArrayRef) {
3274        let num_rows = array.len();
3275        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3276        // take off the first element
3277        let sliced_batch = orig_batch.slice(1, num_rows - 1);
3278
3279        let schema = orig_batch.schema();
3280        let stream_data = {
3281            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3282            writer.write(&sliced_batch).unwrap();
3283            writer.into_inner().unwrap()
3284        };
3285        let read_batch = {
3286            let projection = None;
3287            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3288            reader
3289                .next()
3290                .expect("expect no errors reading batch")
3291                .expect("expect batch")
3292        };
3293        assert_eq!(sliced_batch, read_batch);
3294
3295        let file_data = {
3296            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3297            writer.write(&sliced_batch).unwrap();
3298            writer.into_inner().unwrap().into_inner().unwrap()
3299        };
3300        let read_batch = {
3301            let projection = None;
3302            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3303            reader
3304                .next()
3305                .expect("expect no errors reading batch")
3306                .expect("expect batch")
3307        };
3308        assert_eq!(sliced_batch, read_batch);
3309
3310        // TODO test file writer/reader
3311    }
3312
3313    #[test]
3314    fn encode_bools_slice() {
3315        // Test case for https://github.com/apache/arrow-rs/issues/3496
3316        assert_bool_roundtrip([true, false], 1, 1);
3317
3318        // slice somewhere in the middle
3319        assert_bool_roundtrip(
3320            [
3321                true, false, true, true, false, false, true, true, true, false, false, false, true,
3322                true, true, true, false, false, false, false, true, true, true, true, true, false,
3323                false, false, false, false,
3324            ],
3325            13,
3326            17,
3327        );
3328
3329        // start at byte boundary, end in the middle
3330        assert_bool_roundtrip(
3331            [
3332                true, false, true, true, false, false, true, true, true, false, false, false,
3333            ],
3334            8,
3335            2,
3336        );
3337
3338        // start and stop and byte boundary
3339        assert_bool_roundtrip(
3340            [
3341                true, false, true, true, false, false, true, true, true, false, false, false, true,
3342                true, true, true, true, false, false, false, false, false,
3343            ],
3344            8,
3345            8,
3346        );
3347    }
3348
3349    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3350        let val_bool_field = Field::new("val", DataType::Boolean, false);
3351
3352        let schema = Arc::new(Schema::new(vec![val_bool_field]));
3353
3354        let bools = BooleanArray::from(bools.to_vec());
3355
3356        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3357        let batch = batch.slice(offset, length);
3358
3359        let data = serialize_stream(&batch);
3360        let batch2 = deserialize_stream(data);
3361        assert_eq!(batch, batch2);
3362    }
3363
3364    #[test]
3365    fn test_run_array_unslice() {
3366        let total_len = 80;
3367        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3368        let repeats: Vec<usize> = vec![3, 4, 1, 2];
3369        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3370        for ix in 0_usize..32 {
3371            let repeat: usize = repeats[ix % repeats.len()];
3372            let val: Option<i32> = vals[ix % vals.len()];
3373            input_array.resize(input_array.len() + repeat, val);
3374        }
3375
3376        // Encode the input_array to run array
3377        let mut builder =
3378            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3379        builder.extend(input_array.iter().copied());
3380        let run_array = builder.finish();
3381
3382        // test for all slice lengths.
3383        for slice_len in 1..=total_len {
3384            // test for offset = 0, slice length = slice_len
3385            let sliced_run_array: RunArray<Int16Type> =
3386                run_array.slice(0, slice_len).into_data().into();
3387
3388            // Create unsliced run array.
3389            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3390            let typed = unsliced_run_array
3391                .downcast::<PrimitiveArray<Int32Type>>()
3392                .unwrap();
3393            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3394            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3395            assert_eq!(expected, actual);
3396
3397            // test for offset = total_len - slice_len, length = slice_len
3398            let sliced_run_array: RunArray<Int16Type> = run_array
3399                .slice(total_len - slice_len, slice_len)
3400                .into_data()
3401                .into();
3402
3403            // Create unsliced run array.
3404            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3405            let typed = unsliced_run_array
3406                .downcast::<PrimitiveArray<Int32Type>>()
3407                .unwrap();
3408            let expected: Vec<Option<i32>> = input_array
3409                .iter()
3410                .skip(total_len - slice_len)
3411                .copied()
3412                .collect();
3413            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3414            assert_eq!(expected, actual);
3415        }
3416    }
3417
3418    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3419        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3420
3421        for i in 0..100_000 {
3422            for value in [i, i, i] {
3423                ls.values().append_value(value);
3424            }
3425            ls.append(true)
3426        }
3427
3428        ls.finish()
3429    }
3430
3431    fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3432        let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3433
3434        for i in 0..100_000 {
3435            for value in [
3436                format!("value{}", i),
3437                format!("value{}", i),
3438                format!("value{}", i),
3439            ] {
3440                ls.values().append_value(&value);
3441            }
3442            ls.append(true)
3443        }
3444
3445        ls.finish()
3446    }
3447
3448    fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3449        let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3450
3451        for i in 0..100_000 {
3452            for value in [
3453                format!("value{}", i),
3454                format!("value{}", i),
3455                format!("value{}", i),
3456            ] {
3457                ls.values().append_value(&value);
3458            }
3459            ls.append(true)
3460        }
3461
3462        ls.finish()
3463    }
3464
3465    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3466        let mut ls =
3467            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3468
3469        for _i in 0..10_000 {
3470            for j in 0..10 {
3471                for value in [j, j, j, j] {
3472                    ls.values().values().append_value(value);
3473                }
3474                ls.values().append(true)
3475            }
3476            ls.append(true);
3477        }
3478
3479        ls.finish()
3480    }
3481
3482    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3483        let mut ls =
3484            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3485
3486        for _i in 0..999 {
3487            ls.values().append(true);
3488            ls.append(true);
3489        }
3490
3491        for j in 0..10 {
3492            for value in [j, j, j, j] {
3493                ls.values().values().append_value(value);
3494            }
3495            ls.values().append(true)
3496        }
3497        ls.append(true);
3498
3499        for i in 0..9_000 {
3500            for j in 0..10 {
3501                for value in [i + j, i + j, i + j, i + j] {
3502                    ls.values().values().append_value(value);
3503                }
3504                ls.values().append(true)
3505            }
3506            ls.append(true);
3507        }
3508
3509        ls.finish()
3510    }
3511
3512    fn generate_map_array_data() -> MapArray {
3513        let keys_builder = UInt32Builder::new();
3514        let values_builder = UInt32Builder::new();
3515
3516        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3517
3518        for i in 0..100_000 {
3519            for _j in 0..3 {
3520                builder.keys().append_value(i);
3521                builder.values().append_value(i * 2);
3522            }
3523            builder.append(true).unwrap();
3524        }
3525
3526        builder.finish()
3527    }
3528
3529    #[test]
3530    fn reencode_offsets_when_first_offset_is_not_zero() {
3531        let original_list = generate_list_data::<i32>();
3532        let original_data = original_list.into_data();
3533        let slice_data = original_data.slice(75, 7);
3534        let (new_offsets, original_start, length) =
3535            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3536        assert_eq!(
3537            vec![0, 3, 6, 9, 12, 15, 18, 21],
3538            new_offsets.typed_data::<i32>()
3539        );
3540        assert_eq!(225, original_start);
3541        assert_eq!(21, length);
3542    }
3543
3544    #[test]
3545    fn reencode_offsets_when_first_offset_is_zero() {
3546        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3547        // ls = [[], [35, 42]
3548        ls.append(true);
3549        ls.values().append_value(35);
3550        ls.values().append_value(42);
3551        ls.append(true);
3552        let original_list = ls.finish();
3553        let original_data = original_list.into_data();
3554
3555        let slice_data = original_data.slice(1, 1);
3556        let (new_offsets, original_start, length) =
3557            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3558        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3559        assert_eq!(0, original_start);
3560        assert_eq!(2, length);
3561    }
3562
3563    /// Ensure when serde full & sliced versions they are equal to original input.
3564    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3565    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3566        // test both full and sliced versions
3567        let in_sliced = in_batch.slice(999, 1);
3568
3569        let bytes_batch = serialize_file(&in_batch);
3570        let bytes_sliced = serialize_file(&in_sliced);
3571
3572        // serializing 1 row should be significantly smaller than serializing 100,000
3573        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3574
3575        // ensure both are still valid and equal to originals
3576        let out_batch = deserialize_file(bytes_batch);
3577        assert_eq!(in_batch, out_batch);
3578
3579        let out_sliced = deserialize_file(bytes_sliced);
3580        assert_eq!(in_sliced, out_sliced);
3581    }
3582
3583    #[test]
3584    fn encode_lists() {
3585        let val_inner = Field::new_list_field(DataType::UInt32, true);
3586        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3587        let schema = Arc::new(Schema::new(vec![val_list_field]));
3588
3589        let values = Arc::new(generate_list_data::<i32>());
3590
3591        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3592        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3593    }
3594
3595    #[test]
3596    fn encode_empty_list() {
3597        let val_inner = Field::new_list_field(DataType::UInt32, true);
3598        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3599        let schema = Arc::new(Schema::new(vec![val_list_field]));
3600
3601        let values = Arc::new(generate_list_data::<i32>());
3602
3603        let in_batch = RecordBatch::try_new(schema, vec![values])
3604            .unwrap()
3605            .slice(999, 0);
3606        let out_batch = deserialize_file(serialize_file(&in_batch));
3607        assert_eq!(in_batch, out_batch);
3608    }
3609
3610    #[test]
3611    fn encode_large_lists() {
3612        let val_inner = Field::new_list_field(DataType::UInt32, true);
3613        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3614        let schema = Arc::new(Schema::new(vec![val_list_field]));
3615
3616        let values = Arc::new(generate_list_data::<i64>());
3617
3618        // ensure when serde full & sliced versions they are equal to original input
3619        // also ensure serialized sliced version is significantly smaller than serialized full
3620        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3621        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3622    }
3623
3624    #[test]
3625    fn encode_large_lists_non_zero_offset() {
3626        let val_inner = Field::new_list_field(DataType::UInt32, true);
3627        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3628        let schema = Arc::new(Schema::new(vec![val_list_field]));
3629
3630        let values = Arc::new(generate_list_data::<i64>());
3631
3632        check_sliced_list_array(schema, values);
3633    }
3634
3635    #[test]
3636    fn encode_large_lists_string_non_zero_offset() {
3637        let val_inner = Field::new_list_field(DataType::Utf8, true);
3638        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3639        let schema = Arc::new(Schema::new(vec![val_list_field]));
3640
3641        let values = Arc::new(generate_string_list_data::<i64>());
3642
3643        check_sliced_list_array(schema, values);
3644    }
3645
3646    #[test]
3647    fn encode_large_list_string_view_non_zero_offset() {
3648        let val_inner = Field::new_list_field(DataType::Utf8View, true);
3649        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3650        let schema = Arc::new(Schema::new(vec![val_list_field]));
3651
3652        let values = Arc::new(generate_utf8view_list_data::<i64>());
3653
3654        check_sliced_list_array(schema, values);
3655    }
3656
3657    fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3658        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3659            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3660                .unwrap()
3661                .slice(offset, len);
3662            let out_batch = deserialize_file(serialize_file(&in_batch));
3663            assert_eq!(in_batch, out_batch);
3664        }
3665    }
3666
3667    #[test]
3668    fn encode_nested_lists() {
3669        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3670        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3671        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3672        let schema = Arc::new(Schema::new(vec![list_field]));
3673
3674        let values = Arc::new(generate_nested_list_data::<i32>());
3675
3676        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3677        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3678    }
3679
3680    #[test]
3681    fn encode_nested_lists_starting_at_zero() {
3682        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3683        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3684        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3685        let schema = Arc::new(Schema::new(vec![list_field]));
3686
3687        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3688
3689        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3690        roundtrip_ensure_sliced_smaller(in_batch, 1);
3691    }
3692
3693    #[test]
3694    fn encode_map_array() {
3695        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3696        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3697        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3698        let schema = Arc::new(Schema::new(vec![map_field]));
3699
3700        let values = Arc::new(generate_map_array_data());
3701
3702        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3703        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3704    }
3705
3706    fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3707        let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3708
3709        for i in 0u32..100_000 {
3710            if i.is_multiple_of(10_000) {
3711                builder.append(false);
3712                continue;
3713            }
3714            for value in [i, i, i] {
3715                builder.values().append_value(value);
3716            }
3717            builder.append(true);
3718        }
3719
3720        builder.finish()
3721    }
3722
3723    #[test]
3724    fn encode_list_view_arrays() {
3725        let val_inner = Field::new_list_field(DataType::UInt32, true);
3726        let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3727        let schema = Arc::new(Schema::new(vec![val_field]));
3728
3729        let values = Arc::new(generate_list_view_data::<i32>());
3730
3731        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3732        let out_batch = deserialize_file(serialize_file(&in_batch));
3733        assert_eq!(in_batch, out_batch);
3734    }
3735
3736    #[test]
3737    fn encode_large_list_view_arrays() {
3738        let val_inner = Field::new_list_field(DataType::UInt32, true);
3739        let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3740        let schema = Arc::new(Schema::new(vec![val_field]));
3741
3742        let values = Arc::new(generate_list_view_data::<i64>());
3743
3744        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3745        let out_batch = deserialize_file(serialize_file(&in_batch));
3746        assert_eq!(in_batch, out_batch);
3747    }
3748
3749    #[test]
3750    fn check_sliced_list_view_array() {
3751        let inner = Field::new_list_field(DataType::UInt32, true);
3752        let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3753        let schema = Arc::new(Schema::new(vec![field]));
3754        let values = Arc::new(generate_list_view_data::<i32>());
3755
3756        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3757            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3758                .unwrap()
3759                .slice(offset, len);
3760            let out_batch = deserialize_file(serialize_file(&in_batch));
3761            assert_eq!(in_batch, out_batch);
3762        }
3763    }
3764
3765    #[test]
3766    fn check_sliced_large_list_view_array() {
3767        let inner = Field::new_list_field(DataType::UInt32, true);
3768        let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3769        let schema = Arc::new(Schema::new(vec![field]));
3770        let values = Arc::new(generate_list_view_data::<i64>());
3771
3772        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3773            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3774                .unwrap()
3775                .slice(offset, len);
3776            let out_batch = deserialize_file(serialize_file(&in_batch));
3777            assert_eq!(in_batch, out_batch);
3778        }
3779    }
3780
3781    fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3782        let inner_builder = UInt32Builder::new();
3783        let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3784        let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3785
3786        for i in 0u32..10_000 {
3787            if i.is_multiple_of(1_000) {
3788                outer_builder.append(false);
3789                continue;
3790            }
3791
3792            for _ in 0..3 {
3793                for value in [i, i + 1, i + 2] {
3794                    outer_builder.values().values().append_value(value);
3795                }
3796                outer_builder.values().append(true);
3797            }
3798            outer_builder.append(true);
3799        }
3800
3801        outer_builder.finish()
3802    }
3803
3804    #[test]
3805    fn encode_nested_list_views() {
3806        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3807        let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3808        let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3809        let schema = Arc::new(Schema::new(vec![list_field]));
3810
3811        let values = Arc::new(generate_nested_list_view_data::<i32>());
3812
3813        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3814        let out_batch = deserialize_file(serialize_file(&in_batch));
3815        assert_eq!(in_batch, out_batch);
3816    }
3817
3818    fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3819        list_data_type: DataType,
3820        offsets: &[U; 5],
3821        sizes: &[U; 4],
3822    ) {
3823        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3824        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3825        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3826        let dict_data = dict_array.to_data();
3827
3828        let value_offsets = Buffer::from_slice_ref(offsets);
3829        let value_sizes = Buffer::from_slice_ref(sizes);
3830
3831        let list_data = ArrayData::builder(list_data_type)
3832            .len(4)
3833            .add_buffer(value_offsets)
3834            .add_buffer(value_sizes)
3835            .add_child_data(dict_data)
3836            .build()
3837            .unwrap();
3838        let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3839
3840        let schema = Arc::new(Schema::new(vec![Field::new(
3841            "f1",
3842            list_view_array.data_type().clone(),
3843            false,
3844        )]));
3845        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3846
3847        let output_batch = deserialize_file(serialize_file(&input_batch));
3848        assert_eq!(input_batch, output_batch);
3849
3850        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3851        assert_eq!(input_batch, output_batch);
3852    }
3853
3854    #[test]
3855    fn test_roundtrip_list_view_of_dict() {
3856        #[allow(deprecated)]
3857        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3858            "item",
3859            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3860            true,
3861            1,
3862            false,
3863        )));
3864        let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3865        let sizes: &[i32; 4] = &[2, 2, 0, 3];
3866        test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3867    }
3868
3869    #[test]
3870    fn test_roundtrip_large_list_view_of_dict() {
3871        #[allow(deprecated)]
3872        let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3873            "item",
3874            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3875            true,
3876            2,
3877            false,
3878        )));
3879        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3880        let sizes: &[i64; 4] = &[2, 2, 0, 3];
3881        test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3882    }
3883
3884    #[test]
3885    fn test_roundtrip_sliced_list_view_of_dict() {
3886        #[allow(deprecated)]
3887        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3888            "item",
3889            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3890            true,
3891            3,
3892            false,
3893        )));
3894
3895        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3896        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3897        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3898        let dict_data = dict_array.to_data();
3899
3900        let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3901        let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3902        let value_offsets = Buffer::from_slice_ref(offsets);
3903        let value_sizes = Buffer::from_slice_ref(sizes);
3904
3905        let list_data = ArrayData::builder(list_data_type)
3906            .len(6)
3907            .add_buffer(value_offsets)
3908            .add_buffer(value_sizes)
3909            .add_child_data(dict_data)
3910            .build()
3911            .unwrap();
3912        let list_view_array = GenericListViewArray::<i32>::from(list_data);
3913
3914        let schema = Arc::new(Schema::new(vec![Field::new(
3915            "f1",
3916            list_view_array.data_type().clone(),
3917            false,
3918        )]));
3919        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3920
3921        let sliced_batch = input_batch.slice(1, 4);
3922
3923        let output_batch = deserialize_file(serialize_file(&sliced_batch));
3924        assert_eq!(sliced_batch, output_batch);
3925
3926        let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3927        assert_eq!(sliced_batch, output_batch);
3928    }
3929
3930    #[test]
3931    fn test_roundtrip_dense_union_of_dict() {
3932        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3933        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3934        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3935
3936        #[allow(deprecated)]
3937        let dict_field = Arc::new(Field::new_dict(
3938            "dict",
3939            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3940            true,
3941            1,
3942            false,
3943        ));
3944        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3945        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3946
3947        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3948        let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3949
3950        let int_array = Int32Array::from(vec![100, 200]);
3951
3952        let union = UnionArray::try_new(
3953            union_fields.clone(),
3954            types,
3955            Some(offsets),
3956            vec![Arc::new(dict_array), Arc::new(int_array)],
3957        )
3958        .unwrap();
3959
3960        let schema = Arc::new(Schema::new(vec![Field::new(
3961            "union",
3962            DataType::Union(union_fields, UnionMode::Dense),
3963            false,
3964        )]));
3965        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3966
3967        let output_batch = deserialize_file(serialize_file(&input_batch));
3968        assert_eq!(input_batch, output_batch);
3969
3970        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3971        assert_eq!(input_batch, output_batch);
3972    }
3973
3974    #[test]
3975    fn test_roundtrip_sparse_union_of_dict() {
3976        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3977        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3978        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3979
3980        #[allow(deprecated)]
3981        let dict_field = Arc::new(Field::new_dict(
3982            "dict",
3983            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3984            true,
3985            2,
3986            false,
3987        ));
3988        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3989        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3990
3991        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3992
3993        let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3994
3995        let union = UnionArray::try_new(
3996            union_fields.clone(),
3997            types,
3998            None,
3999            vec![Arc::new(dict_array), Arc::new(int_array)],
4000        )
4001        .unwrap();
4002
4003        let schema = Arc::new(Schema::new(vec![Field::new(
4004            "union",
4005            DataType::Union(union_fields, UnionMode::Sparse),
4006            false,
4007        )]));
4008        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
4009
4010        let output_batch = deserialize_file(serialize_file(&input_batch));
4011        assert_eq!(input_batch, output_batch);
4012
4013        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4014        assert_eq!(input_batch, output_batch);
4015    }
4016
4017    #[test]
4018    fn test_roundtrip_map_with_dict_keys() {
4019        // Building a map array is a bit involved. We first build a struct arary that has a key and
4020        // value field and then use that to build the actual map array.
4021        let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
4022        let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4023        let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
4024
4025        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4026
4027        #[allow(deprecated)]
4028        let entries_field = Arc::new(Field::new(
4029            "entries",
4030            DataType::Struct(
4031                vec![
4032                    Field::new_dict(
4033                        "key",
4034                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4035                        false,
4036                        1,
4037                        false,
4038                    ),
4039                    Field::new("value", DataType::Int32, true),
4040                ]
4041                .into(),
4042            ),
4043            false,
4044        ));
4045
4046        let entries = StructArray::from(vec![
4047            (
4048                Arc::new(Field::new(
4049                    "key",
4050                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4051                    false,
4052                )),
4053                Arc::new(dict_keys) as ArrayRef,
4054            ),
4055            (
4056                Arc::new(Field::new("value", DataType::Int32, true)),
4057                Arc::new(values) as ArrayRef,
4058            ),
4059        ]);
4060
4061        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4062
4063        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4064            .len(3)
4065            .add_buffer(offsets)
4066            .add_child_data(entries.into_data())
4067            .build()
4068            .unwrap();
4069        let map_array = MapArray::from(map_data);
4070
4071        let schema = Arc::new(Schema::new(vec![Field::new(
4072            "map",
4073            map_array.data_type().clone(),
4074            false,
4075        )]));
4076        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4077
4078        let output_batch = deserialize_file(serialize_file(&input_batch));
4079        assert_eq!(input_batch, output_batch);
4080
4081        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4082        assert_eq!(input_batch, output_batch);
4083    }
4084
4085    #[test]
4086    fn test_roundtrip_map_with_dict_values() {
4087        // Building a map array is a bit involved. We first build a struct arary that has a key and
4088        // value field and then use that to build the actual map array.
4089        let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
4090
4091        let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
4092        let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4093        let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
4094
4095        #[allow(deprecated)]
4096        let entries_field = Arc::new(Field::new(
4097            "entries",
4098            DataType::Struct(
4099                vec![
4100                    Field::new("key", DataType::Utf8, false),
4101                    Field::new_dict(
4102                        "value",
4103                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4104                        true,
4105                        2,
4106                        false,
4107                    ),
4108                ]
4109                .into(),
4110            ),
4111            false,
4112        ));
4113
4114        let entries = StructArray::from(vec![
4115            (
4116                Arc::new(Field::new("key", DataType::Utf8, false)),
4117                Arc::new(keys) as ArrayRef,
4118            ),
4119            (
4120                Arc::new(Field::new(
4121                    "value",
4122                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4123                    true,
4124                )),
4125                Arc::new(dict_values) as ArrayRef,
4126            ),
4127        ]);
4128
4129        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4130
4131        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4132            .len(3)
4133            .add_buffer(offsets)
4134            .add_child_data(entries.into_data())
4135            .build()
4136            .unwrap();
4137        let map_array = MapArray::from(map_data);
4138
4139        let schema = Arc::new(Schema::new(vec![Field::new(
4140            "map",
4141            map_array.data_type().clone(),
4142            false,
4143        )]));
4144        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4145
4146        let output_batch = deserialize_file(serialize_file(&input_batch));
4147        assert_eq!(input_batch, output_batch);
4148
4149        let output_batch = deserialize_stream(serialize_stream(&input_batch));
4150        assert_eq!(input_batch, output_batch);
4151    }
4152
4153    #[test]
4154    fn test_decimal128_alignment16_is_sufficient() {
4155        const IPC_ALIGNMENT: usize = 16;
4156
4157        // Test a bunch of different dimensions to ensure alignment is never an issue.
4158        // For example, if we only test `num_cols = 1` then even with alignment 8 this
4159        // test would _happen_ to pass, even though for different dimensions like
4160        // `num_cols = 2` it would fail.
4161        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4162            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
4163
4164            let mut fields = Vec::new();
4165            let mut arrays = Vec::new();
4166            for i in 0..num_cols {
4167                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4168                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4169                fields.push(field);
4170                arrays.push(Arc::new(array) as Arc<dyn Array>);
4171            }
4172            let schema = Schema::new(fields);
4173            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4174
4175            let mut writer = FileWriter::try_new_with_options(
4176                Vec::new(),
4177                batch.schema_ref(),
4178                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4179            )
4180            .unwrap();
4181            writer.write(&batch).unwrap();
4182            writer.finish().unwrap();
4183
4184            let out: Vec<u8> = writer.into_inner().unwrap();
4185
4186            let buffer = Buffer::from_vec(out);
4187            let trailer_start = buffer.len() - 10;
4188            let footer_len =
4189                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4190            let footer =
4191                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4192
4193            let schema = fb_to_schema(footer.schema().unwrap());
4194
4195            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
4196            // for `read_record_batch` later on to read the data in a zero-copy manner.
4197            let decoder =
4198                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4199
4200            let batches = footer.recordBatches().unwrap();
4201
4202            let block = batches.get(0);
4203            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4204            let data = buffer.slice_with_length(block.offset() as _, block_len);
4205
4206            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4207
4208            assert_eq!(batch, batch2);
4209        }
4210    }
4211
4212    #[test]
4213    fn test_decimal128_alignment8_is_unaligned() {
4214        const IPC_ALIGNMENT: usize = 8;
4215
4216        let num_cols = 2;
4217        let num_rows = 1;
4218
4219        let mut fields = Vec::new();
4220        let mut arrays = Vec::new();
4221        for i in 0..num_cols {
4222            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4223            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4224            fields.push(field);
4225            arrays.push(Arc::new(array) as Arc<dyn Array>);
4226        }
4227        let schema = Schema::new(fields);
4228        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4229
4230        let mut writer = FileWriter::try_new_with_options(
4231            Vec::new(),
4232            batch.schema_ref(),
4233            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4234        )
4235        .unwrap();
4236        writer.write(&batch).unwrap();
4237        writer.finish().unwrap();
4238
4239        let out: Vec<u8> = writer.into_inner().unwrap();
4240
4241        let buffer = Buffer::from_vec(out);
4242        let trailer_start = buffer.len() - 10;
4243        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4244        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4245        let schema = fb_to_schema(footer.schema().unwrap());
4246
4247        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
4248        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
4249        let decoder =
4250            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4251
4252        let batches = footer.recordBatches().unwrap();
4253
4254        let block = batches.get(0);
4255        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4256        let data = buffer.slice_with_length(block.offset() as _, block_len);
4257
4258        let result = decoder.read_record_batch(block, &data);
4259
4260        let error = result.unwrap_err();
4261        assert_eq!(
4262            error.to_string(),
4263            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4264             offset from expected alignment of 16 by 8"
4265        );
4266    }
4267
4268    #[test]
4269    fn test_flush() {
4270        // We write a schema which is small enough to fit into a buffer and not get flushed,
4271        // and then force the write with .flush().
4272        let num_cols = 2;
4273        let mut fields = Vec::new();
4274        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4275        for i in 0..num_cols {
4276            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4277            fields.push(field);
4278        }
4279        let schema = Schema::new(fields);
4280        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4281        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4282        let mut stream_writer =
4283            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4284                .unwrap();
4285        let mut file_writer =
4286            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4287
4288        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4289        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4290        stream_writer.flush().unwrap();
4291        file_writer.flush().unwrap();
4292        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4293        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4294        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4295        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
4296        // and then a length of 0 (4 bytes) for a total of 8 bytes.
4297        // Everything before that should have been flushed in the .flush() call.
4298        let expected_stream_flushed_bytes = stream_out.len() - 8;
4299        // A file write is the same as the stream write except for the leading magic string
4300        // ARROW1 plus padding, which is 8 bytes.
4301        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4302
4303        assert!(
4304            stream_bytes_written_on_new < stream_bytes_written_on_flush,
4305            "this test makes no sense if flush is not actually required"
4306        );
4307        assert!(
4308            file_bytes_written_on_new < file_bytes_written_on_flush,
4309            "this test makes no sense if flush is not actually required"
4310        );
4311        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4312        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4313    }
4314
4315    #[test]
4316    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4317        let l1_type =
4318            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4319        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4320
4321        let l0_builder = Float32Builder::new();
4322        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4323            "item",
4324            DataType::Float32,
4325            false,
4326        )));
4327        let mut l2_builder =
4328            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4329
4330        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4331            l2_builder.values().values().append_value(point[0]);
4332            l2_builder.values().values().append_value(point[1]);
4333            l2_builder.values().values().append_value(point[2]);
4334
4335            l2_builder.values().append(true);
4336        }
4337        l2_builder.append(true);
4338
4339        let point = [10., 11., 12.];
4340        l2_builder.values().values().append_value(point[0]);
4341        l2_builder.values().values().append_value(point[1]);
4342        l2_builder.values().values().append_value(point[2]);
4343
4344        l2_builder.values().append(true);
4345        l2_builder.append(true);
4346
4347        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4348
4349        let schema = Arc::new(Schema::new_with_metadata(
4350            vec![Field::new("points", l2_type, false)],
4351            HashMap::default(),
4352        ));
4353
4354        // Test a variety of combinations that include 0 and non-zero offsets
4355        // and also portions or the rest of the array
4356        test_slices(&array, &schema, 0, 1)?;
4357        test_slices(&array, &schema, 0, 2)?;
4358        test_slices(&array, &schema, 1, 1)?;
4359
4360        Ok(())
4361    }
4362
4363    #[test]
4364    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4365        let l0_builder = Float32Builder::new();
4366        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4367        let mut l2_builder = ListBuilder::new(l1_builder);
4368
4369        for point in [
4370            [Some(1.0), Some(2.0), None],
4371            [Some(4.0), Some(5.0), Some(6.0)],
4372            [None, Some(8.0), Some(9.0)],
4373        ] {
4374            for p in point {
4375                match p {
4376                    Some(p) => l2_builder.values().values().append_value(p),
4377                    None => l2_builder.values().values().append_null(),
4378                }
4379            }
4380
4381            l2_builder.values().append(true);
4382        }
4383        l2_builder.append(true);
4384
4385        let point = [Some(10.), None, None];
4386        for p in point {
4387            match p {
4388                Some(p) => l2_builder.values().values().append_value(p),
4389                None => l2_builder.values().values().append_null(),
4390            }
4391        }
4392
4393        l2_builder.values().append(true);
4394        l2_builder.append(true);
4395
4396        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4397
4398        let schema = Arc::new(Schema::new_with_metadata(
4399            vec![Field::new(
4400                "points",
4401                DataType::List(Arc::new(Field::new(
4402                    "item",
4403                    DataType::FixedSizeList(
4404                        Arc::new(Field::new("item", DataType::Float32, true)),
4405                        3,
4406                    ),
4407                    true,
4408                ))),
4409                true,
4410            )],
4411            HashMap::default(),
4412        ));
4413
4414        // Test a variety of combinations that include 0 and non-zero offsets
4415        // and also portions or the rest of the array
4416        test_slices(&array, &schema, 0, 1)?;
4417        test_slices(&array, &schema, 0, 2)?;
4418        test_slices(&array, &schema, 1, 1)?;
4419
4420        Ok(())
4421    }
4422
4423    fn test_slices(
4424        parent_array: &ArrayRef,
4425        schema: &SchemaRef,
4426        offset: usize,
4427        length: usize,
4428    ) -> Result<(), ArrowError> {
4429        let subarray = parent_array.slice(offset, length);
4430        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4431
4432        let mut bytes = Vec::new();
4433        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4434        writer.write(&original_batch)?;
4435        writer.finish()?;
4436
4437        let mut cursor = std::io::Cursor::new(bytes);
4438        let mut reader = StreamReader::try_new(&mut cursor, None)?;
4439        let returned_batch = reader.next().unwrap()?;
4440
4441        assert_eq!(original_batch, returned_batch);
4442
4443        Ok(())
4444    }
4445
4446    #[test]
4447    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4448        let int_builder = Int64Builder::new();
4449        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4450            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4451
4452        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4453            fixed_list_builder.values().append_value(point[0]);
4454            fixed_list_builder.values().append_value(point[1]);
4455            fixed_list_builder.values().append_value(point[2]);
4456
4457            fixed_list_builder.append(true);
4458        }
4459
4460        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4461
4462        let schema = Arc::new(Schema::new_with_metadata(
4463            vec![Field::new(
4464                "points",
4465                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4466                false,
4467            )],
4468            HashMap::default(),
4469        ));
4470
4471        // Test a variety of combinations that include 0 and non-zero offsets
4472        // and also portions or the rest of the array
4473        test_slices(&array, &schema, 0, 4)?;
4474        test_slices(&array, &schema, 0, 2)?;
4475        test_slices(&array, &schema, 1, 3)?;
4476        test_slices(&array, &schema, 2, 1)?;
4477
4478        Ok(())
4479    }
4480
4481    #[test]
4482    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4483        let int_builder = Int64Builder::new();
4484        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4485
4486        for point in [
4487            [Some(1), Some(2), None],
4488            [Some(4), Some(5), Some(6)],
4489            [None, Some(8), Some(9)],
4490            [Some(10), None, None],
4491        ] {
4492            for p in point {
4493                match p {
4494                    Some(p) => fixed_list_builder.values().append_value(p),
4495                    None => fixed_list_builder.values().append_null(),
4496                }
4497            }
4498
4499            fixed_list_builder.append(true);
4500        }
4501
4502        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4503
4504        let schema = Arc::new(Schema::new_with_metadata(
4505            vec![Field::new(
4506                "points",
4507                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4508                true,
4509            )],
4510            HashMap::default(),
4511        ));
4512
4513        // Test a variety of combinations that include 0 and non-zero offsets
4514        // and also portions or the rest of the array
4515        test_slices(&array, &schema, 0, 4)?;
4516        test_slices(&array, &schema, 0, 2)?;
4517        test_slices(&array, &schema, 1, 3)?;
4518        test_slices(&array, &schema, 2, 1)?;
4519
4520        Ok(())
4521    }
4522
4523    #[test]
4524    fn test_metadata_encoding_ordering() {
4525        fn create_hash() -> u64 {
4526            let metadata: HashMap<String, String> = [
4527                ("a", "1"), //
4528                ("b", "2"), //
4529                ("c", "3"), //
4530                ("d", "4"), //
4531                ("e", "5"), //
4532            ]
4533            .into_iter()
4534            .map(|(k, v)| (k.to_owned(), v.to_owned()))
4535            .collect();
4536
4537            // Set metadata on both the schema and a field within it.
4538            let schema = Arc::new(
4539                Schema::new(vec![
4540                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4541                ])
4542                .with_metadata(metadata)
4543                .clone(),
4544            );
4545            let batch = RecordBatch::new_empty(schema.clone());
4546
4547            let mut bytes = Vec::new();
4548            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4549            w.write(&batch).unwrap();
4550            w.finish().unwrap();
4551
4552            let mut h = std::hash::DefaultHasher::new();
4553            h.write(&bytes);
4554            h.finish()
4555        }
4556
4557        let expected = create_hash();
4558
4559        // Since there is randomness in the HashMap and we cannot specify our
4560        // own Hasher for the implementation used for metadata, run the above
4561        // code 20x and verify it does not change. This is not perfect but it
4562        // should be good enough.
4563        let all_passed = (0..20).all(|_| create_hash() == expected);
4564        assert!(all_passed);
4565    }
4566
4567    #[test]
4568    fn test_dictionary_tracker_reset() {
4569        let data_gen = IpcDataGenerator::default();
4570        let mut dictionary_tracker = DictionaryTracker::new(false);
4571        let writer_options = IpcWriteOptions::default();
4572        let mut compression_ctx = IpcWriteContext::default();
4573
4574        let schema = Arc::new(Schema::new(vec![Field::new(
4575            "a",
4576            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4577            false,
4578        )]));
4579
4580        let mut write_single_batch_stream =
4581            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4582                let mut buffer = Vec::new();
4583
4584                // create a new IPC stream:
4585                let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4586                    &schema,
4587                    dict_tracker,
4588                    &writer_options,
4589                );
4590                _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4591
4592                let (encoded_dicts, encoded_batch) = data_gen
4593                    .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4594                    .unwrap();
4595                for encoded_dict in encoded_dicts {
4596                    _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4597                }
4598                _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4599
4600                buffer
4601            };
4602
4603        let batch1 = RecordBatch::try_new(
4604            schema.clone(),
4605            vec![Arc::new(DictionaryArray::new(
4606                UInt8Array::from_iter_values([0]),
4607                Arc::new(StringArray::from_iter_values(["a"])),
4608            ))],
4609        )
4610        .unwrap();
4611        let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4612
4613        // ensure we can read the stream back
4614        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4615        let read_batch = reader.next().unwrap().unwrap();
4616        assert_eq!(read_batch, batch1);
4617
4618        // reset the dictionary tracker so it can be used for next stream
4619        dictionary_tracker.clear();
4620
4621        // now write a 2nd stream and ensure we can also read it:
4622        let batch2 = RecordBatch::try_new(
4623            schema.clone(),
4624            vec![Arc::new(DictionaryArray::new(
4625                UInt8Array::from_iter_values([0]),
4626                Arc::new(StringArray::from_iter_values(["a"])),
4627            ))],
4628        )
4629        .unwrap();
4630        let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4631        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4632        let read_batch = reader.next().unwrap().unwrap();
4633        assert_eq!(read_batch, batch2);
4634    }
4635}