arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
50#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52    /// Write padding after memory buffers to this multiple of bytes.
53    /// Must be 8, 16, 32, or 64 - defaults to 64.
54    alignment: u8,
55    /// The legacy format is for releases before 0.15.0, and uses metadata V4
56    write_legacy_ipc_format: bool,
57    /// The metadata version to write. The Rust IPC writer supports V4+
58    ///
59    /// *Default versions per crate*
60    ///
61    /// When creating the default IpcWriteOptions, the following metadata versions are used:
62    ///
63    /// version 2.0.0: V4, with legacy format enabled
64    /// version 4.0.0: V5
65    metadata_version: crate::MetadataVersion,
66    /// Compression, if desired. Will result in a runtime error
67    /// if the corresponding feature is not enabled
68    batch_compression_type: Option<crate::CompressionType>,
69    /// How to handle updating dictionaries in IPC messages
70    dictionary_handling: DictionaryHandling,
71}
72
73impl IpcWriteOptions {
74    /// Configures compression when writing IPC files.
75    ///
76    /// Will result in a runtime error if the corresponding feature
77    /// is not enabled
78    pub fn try_with_compression(
79        mut self,
80        batch_compression_type: Option<crate::CompressionType>,
81    ) -> Result<Self, ArrowError> {
82        self.batch_compression_type = batch_compression_type;
83
84        if self.batch_compression_type.is_some()
85            && self.metadata_version < crate::MetadataVersion::V5
86        {
87            return Err(ArrowError::InvalidArgumentError(
88                "Compression only supported in metadata v5 and above".to_string(),
89            ));
90        }
91        Ok(self)
92    }
93    /// Try to create IpcWriteOptions, checking for incompatible settings
94    pub fn try_new(
95        alignment: usize,
96        write_legacy_ipc_format: bool,
97        metadata_version: crate::MetadataVersion,
98    ) -> Result<Self, ArrowError> {
99        let is_alignment_valid =
100            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
101        if !is_alignment_valid {
102            return Err(ArrowError::InvalidArgumentError(
103                "Alignment should be 8, 16, 32, or 64.".to_string(),
104            ));
105        }
106        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
107        match metadata_version {
108            crate::MetadataVersion::V1
109            | crate::MetadataVersion::V2
110            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
111                "Writing IPC metadata version 3 and lower not supported".to_string(),
112            )),
113            #[allow(deprecated)]
114            crate::MetadataVersion::V4 => Ok(Self {
115                alignment,
116                write_legacy_ipc_format,
117                metadata_version,
118                batch_compression_type: None,
119                dictionary_handling: DictionaryHandling::default(),
120            }),
121            crate::MetadataVersion::V5 => {
122                if write_legacy_ipc_format {
123                    Err(ArrowError::InvalidArgumentError(
124                        "Legacy IPC format only supported on metadata version 4".to_string(),
125                    ))
126                } else {
127                    Ok(Self {
128                        alignment,
129                        write_legacy_ipc_format,
130                        metadata_version,
131                        batch_compression_type: None,
132                        dictionary_handling: DictionaryHandling::default(),
133                    })
134                }
135            }
136            z => Err(ArrowError::InvalidArgumentError(format!(
137                "Unsupported crate::MetadataVersion {z:?}"
138            ))),
139        }
140    }
141
142    /// Configure how dictionaries are handled in IPC messages
143    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
144        self.dictionary_handling = dictionary_handling;
145        self
146    }
147}
148
149impl Default for IpcWriteOptions {
150    fn default() -> Self {
151        Self {
152            alignment: 64,
153            write_legacy_ipc_format: false,
154            metadata_version: crate::MetadataVersion::V5,
155            batch_compression_type: None,
156            dictionary_handling: DictionaryHandling::default(),
157        }
158    }
159}
160
161#[derive(Debug, Default)]
162/// Handles low level details of encoding [`Array`] and [`Schema`] into the
163/// [Arrow IPC Format].
164///
165/// # Example
166/// ```
167/// # fn run() {
168/// # use std::sync::Arc;
169/// # use arrow_array::UInt64Array;
170/// # use arrow_array::RecordBatch;
171/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
172///
173/// // Create a record batch
174/// let batch = RecordBatch::try_from_iter(vec![
175///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
176/// ]).unwrap();
177///
178/// // Error of dictionary ids are replaced.
179/// let error_on_replacement = true;
180/// let options = IpcWriteOptions::default();
181/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
182///
183/// let mut compression_context = CompressionContext::default();
184///
185/// // encode the batch into zero or more encoded dictionaries
186/// // and the data for the actual array.
187/// let data_gen = IpcDataGenerator::default();
188/// let (encoded_dictionaries, encoded_message) = data_gen
189///   .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context)
190///   .unwrap();
191/// # }
192/// ```
193///
194/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
195pub struct IpcDataGenerator {}
196
197impl IpcDataGenerator {
198    /// Converts a schema to an IPC message along with `dictionary_tracker`
199    /// and returns it encoded inside [EncodedData] as a flatbuffer.
200    pub fn schema_to_bytes_with_dictionary_tracker(
201        &self,
202        schema: &Schema,
203        dictionary_tracker: &mut DictionaryTracker,
204        write_options: &IpcWriteOptions,
205    ) -> EncodedData {
206        let mut fbb = FlatBufferBuilder::new();
207        let schema = {
208            let fb = IpcSchemaEncoder::new()
209                .with_dictionary_tracker(dictionary_tracker)
210                .schema_to_fb_offset(&mut fbb, schema);
211            fb.as_union_value()
212        };
213
214        let mut message = crate::MessageBuilder::new(&mut fbb);
215        message.add_version(write_options.metadata_version);
216        message.add_header_type(crate::MessageHeader::Schema);
217        message.add_bodyLength(0);
218        message.add_header(schema);
219        // TODO: custom metadata
220        let data = message.finish();
221        fbb.finish(data, None);
222
223        let data = fbb.finished_data();
224        EncodedData {
225            ipc_message: data.to_vec(),
226            arrow_data: vec![],
227        }
228    }
229
230    fn _encode_dictionaries<I: Iterator<Item = i64>>(
231        &self,
232        column: &ArrayRef,
233        encoded_dictionaries: &mut Vec<EncodedData>,
234        dictionary_tracker: &mut DictionaryTracker,
235        write_options: &IpcWriteOptions,
236        dict_id: &mut I,
237        compression_context: &mut CompressionContext,
238    ) -> Result<(), ArrowError> {
239        match column.data_type() {
240            DataType::Struct(fields) => {
241                let s = as_struct_array(column);
242                for (field, column) in fields.iter().zip(s.columns()) {
243                    self.encode_dictionaries(
244                        field,
245                        column,
246                        encoded_dictionaries,
247                        dictionary_tracker,
248                        write_options,
249                        dict_id,
250                        compression_context,
251                    )?;
252                }
253            }
254            DataType::RunEndEncoded(_, values) => {
255                let data = column.to_data();
256                if data.child_data().len() != 2 {
257                    return Err(ArrowError::InvalidArgumentError(format!(
258                        "The run encoded array should have exactly two child arrays. Found {}",
259                        data.child_data().len()
260                    )));
261                }
262                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
263                // only for values array.
264                let values_array = make_array(data.child_data()[1].clone());
265                self.encode_dictionaries(
266                    values,
267                    &values_array,
268                    encoded_dictionaries,
269                    dictionary_tracker,
270                    write_options,
271                    dict_id,
272                    compression_context,
273                )?;
274            }
275            DataType::List(field) => {
276                let list = as_list_array(column);
277                self.encode_dictionaries(
278                    field,
279                    list.values(),
280                    encoded_dictionaries,
281                    dictionary_tracker,
282                    write_options,
283                    dict_id,
284                    compression_context,
285                )?;
286            }
287            DataType::LargeList(field) => {
288                let list = as_large_list_array(column);
289                self.encode_dictionaries(
290                    field,
291                    list.values(),
292                    encoded_dictionaries,
293                    dictionary_tracker,
294                    write_options,
295                    dict_id,
296                    compression_context,
297                )?;
298            }
299            DataType::FixedSizeList(field, _) => {
300                let list = column
301                    .as_any()
302                    .downcast_ref::<FixedSizeListArray>()
303                    .expect("Unable to downcast to fixed size list array");
304                self.encode_dictionaries(
305                    field,
306                    list.values(),
307                    encoded_dictionaries,
308                    dictionary_tracker,
309                    write_options,
310                    dict_id,
311                    compression_context,
312                )?;
313            }
314            DataType::Map(field, _) => {
315                let map_array = as_map_array(column);
316
317                let (keys, values) = match field.data_type() {
318                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
319                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
320                };
321
322                // keys
323                self.encode_dictionaries(
324                    keys,
325                    map_array.keys(),
326                    encoded_dictionaries,
327                    dictionary_tracker,
328                    write_options,
329                    dict_id,
330                    compression_context,
331                )?;
332
333                // values
334                self.encode_dictionaries(
335                    values,
336                    map_array.values(),
337                    encoded_dictionaries,
338                    dictionary_tracker,
339                    write_options,
340                    dict_id,
341                    compression_context,
342                )?;
343            }
344            DataType::Union(fields, _) => {
345                let union = as_union_array(column);
346                for (type_id, field) in fields.iter() {
347                    let column = union.child(type_id);
348                    self.encode_dictionaries(
349                        field,
350                        column,
351                        encoded_dictionaries,
352                        dictionary_tracker,
353                        write_options,
354                        dict_id,
355                        compression_context,
356                    )?;
357                }
358            }
359            _ => (),
360        }
361
362        Ok(())
363    }
364
365    #[allow(clippy::too_many_arguments)]
366    fn encode_dictionaries<I: Iterator<Item = i64>>(
367        &self,
368        field: &Field,
369        column: &ArrayRef,
370        encoded_dictionaries: &mut Vec<EncodedData>,
371        dictionary_tracker: &mut DictionaryTracker,
372        write_options: &IpcWriteOptions,
373        dict_id_seq: &mut I,
374        compression_context: &mut CompressionContext,
375    ) -> Result<(), ArrowError> {
376        match column.data_type() {
377            DataType::Dictionary(_key_type, _value_type) => {
378                let dict_data = column.to_data();
379                let dict_values = &dict_data.child_data()[0];
380
381                let values = make_array(dict_data.child_data()[0].clone());
382
383                self._encode_dictionaries(
384                    &values,
385                    encoded_dictionaries,
386                    dictionary_tracker,
387                    write_options,
388                    dict_id_seq,
389                    compression_context,
390                )?;
391
392                // It's important to only take the dict_id at this point, because the dict ID
393                // sequence is assigned depth-first, so we need to first encode children and have
394                // them take their assigned dict IDs before we take the dict ID for this field.
395                let dict_id = dict_id_seq.next().ok_or_else(|| {
396                    ArrowError::IpcError(format!("no dict id for field {}", field.name()))
397                })?;
398
399                match dictionary_tracker.insert_column(
400                    dict_id,
401                    column,
402                    write_options.dictionary_handling,
403                )? {
404                    DictionaryUpdate::None => {}
405                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
406                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
407                            dict_id,
408                            dict_values,
409                            write_options,
410                            false,
411                            compression_context,
412                        )?);
413                    }
414                    DictionaryUpdate::Delta(data) => {
415                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
416                            dict_id,
417                            &data,
418                            write_options,
419                            true,
420                            compression_context,
421                        )?);
422                    }
423                }
424            }
425            _ => self._encode_dictionaries(
426                column,
427                encoded_dictionaries,
428                dictionary_tracker,
429                write_options,
430                dict_id_seq,
431                compression_context,
432            )?,
433        }
434
435        Ok(())
436    }
437
438    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
439    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
440    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
441    pub fn encode(
442        &self,
443        batch: &RecordBatch,
444        dictionary_tracker: &mut DictionaryTracker,
445        write_options: &IpcWriteOptions,
446        compression_context: &mut CompressionContext,
447    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
448        let schema = batch.schema();
449        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
450
451        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
452
453        for (i, field) in schema.fields().iter().enumerate() {
454            let column = batch.column(i);
455            self.encode_dictionaries(
456                field,
457                column,
458                &mut encoded_dictionaries,
459                dictionary_tracker,
460                write_options,
461                &mut dict_id,
462                compression_context,
463            )?;
464        }
465
466        let encoded_message =
467            self.record_batch_to_bytes(batch, write_options, compression_context)?;
468        Ok((encoded_dictionaries, encoded_message))
469    }
470
471    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
472    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
473    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
474    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
475    pub fn encoded_batch(
476        &self,
477        batch: &RecordBatch,
478        dictionary_tracker: &mut DictionaryTracker,
479        write_options: &IpcWriteOptions,
480    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
481        self.encode(
482            batch,
483            dictionary_tracker,
484            write_options,
485            &mut Default::default(),
486        )
487    }
488
489    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
490    /// other for the batch's data
491    fn record_batch_to_bytes(
492        &self,
493        batch: &RecordBatch,
494        write_options: &IpcWriteOptions,
495        compression_context: &mut CompressionContext,
496    ) -> Result<EncodedData, ArrowError> {
497        let mut fbb = FlatBufferBuilder::new();
498
499        let mut nodes: Vec<crate::FieldNode> = vec![];
500        let mut buffers: Vec<crate::Buffer> = vec![];
501        let mut arrow_data: Vec<u8> = vec![];
502        let mut offset = 0;
503
504        // get the type of compression
505        let batch_compression_type = write_options.batch_compression_type;
506
507        let compression = batch_compression_type.map(|batch_compression_type| {
508            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
509            c.add_method(crate::BodyCompressionMethod::BUFFER);
510            c.add_codec(batch_compression_type);
511            c.finish()
512        });
513
514        let compression_codec: Option<CompressionCodec> =
515            batch_compression_type.map(TryInto::try_into).transpose()?;
516
517        let mut variadic_buffer_counts = vec![];
518
519        for array in batch.columns() {
520            let array_data = array.to_data();
521            offset = write_array_data(
522                &array_data,
523                &mut buffers,
524                &mut arrow_data,
525                &mut nodes,
526                offset,
527                array.len(),
528                array.null_count(),
529                compression_codec,
530                compression_context,
531                write_options,
532            )?;
533
534            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
535        }
536        // pad the tail of body data
537        let len = arrow_data.len();
538        let pad_len = pad_to_alignment(write_options.alignment, len);
539        arrow_data.extend_from_slice(&PADDING[..pad_len]);
540
541        // write data
542        let buffers = fbb.create_vector(&buffers);
543        let nodes = fbb.create_vector(&nodes);
544        let variadic_buffer = if variadic_buffer_counts.is_empty() {
545            None
546        } else {
547            Some(fbb.create_vector(&variadic_buffer_counts))
548        };
549
550        let root = {
551            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
552            batch_builder.add_length(batch.num_rows() as i64);
553            batch_builder.add_nodes(nodes);
554            batch_builder.add_buffers(buffers);
555            if let Some(c) = compression {
556                batch_builder.add_compression(c);
557            }
558
559            if let Some(v) = variadic_buffer {
560                batch_builder.add_variadicBufferCounts(v);
561            }
562            let b = batch_builder.finish();
563            b.as_union_value()
564        };
565        // create an crate::Message
566        let mut message = crate::MessageBuilder::new(&mut fbb);
567        message.add_version(write_options.metadata_version);
568        message.add_header_type(crate::MessageHeader::RecordBatch);
569        message.add_bodyLength(arrow_data.len() as i64);
570        message.add_header(root);
571        let root = message.finish();
572        fbb.finish(root, None);
573        let finished_data = fbb.finished_data();
574
575        Ok(EncodedData {
576            ipc_message: finished_data.to_vec(),
577            arrow_data,
578        })
579    }
580
581    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
582    /// other for the data
583    fn dictionary_batch_to_bytes(
584        &self,
585        dict_id: i64,
586        array_data: &ArrayData,
587        write_options: &IpcWriteOptions,
588        is_delta: bool,
589        compression_context: &mut CompressionContext,
590    ) -> Result<EncodedData, ArrowError> {
591        let mut fbb = FlatBufferBuilder::new();
592
593        let mut nodes: Vec<crate::FieldNode> = vec![];
594        let mut buffers: Vec<crate::Buffer> = vec![];
595        let mut arrow_data: Vec<u8> = vec![];
596
597        // get the type of compression
598        let batch_compression_type = write_options.batch_compression_type;
599
600        let compression = batch_compression_type.map(|batch_compression_type| {
601            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
602            c.add_method(crate::BodyCompressionMethod::BUFFER);
603            c.add_codec(batch_compression_type);
604            c.finish()
605        });
606
607        let compression_codec: Option<CompressionCodec> = batch_compression_type
608            .map(|batch_compression_type| batch_compression_type.try_into())
609            .transpose()?;
610
611        write_array_data(
612            array_data,
613            &mut buffers,
614            &mut arrow_data,
615            &mut nodes,
616            0,
617            array_data.len(),
618            array_data.null_count(),
619            compression_codec,
620            compression_context,
621            write_options,
622        )?;
623
624        let mut variadic_buffer_counts = vec![];
625        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
626
627        // pad the tail of body data
628        let len = arrow_data.len();
629        let pad_len = pad_to_alignment(write_options.alignment, len);
630        arrow_data.extend_from_slice(&PADDING[..pad_len]);
631
632        // write data
633        let buffers = fbb.create_vector(&buffers);
634        let nodes = fbb.create_vector(&nodes);
635        let variadic_buffer = if variadic_buffer_counts.is_empty() {
636            None
637        } else {
638            Some(fbb.create_vector(&variadic_buffer_counts))
639        };
640
641        let root = {
642            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
643            batch_builder.add_length(array_data.len() as i64);
644            batch_builder.add_nodes(nodes);
645            batch_builder.add_buffers(buffers);
646            if let Some(c) = compression {
647                batch_builder.add_compression(c);
648            }
649            if let Some(v) = variadic_buffer {
650                batch_builder.add_variadicBufferCounts(v);
651            }
652            batch_builder.finish()
653        };
654
655        let root = {
656            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
657            batch_builder.add_id(dict_id);
658            batch_builder.add_data(root);
659            batch_builder.add_isDelta(is_delta);
660            batch_builder.finish().as_union_value()
661        };
662
663        let root = {
664            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
665            message_builder.add_version(write_options.metadata_version);
666            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
667            message_builder.add_bodyLength(arrow_data.len() as i64);
668            message_builder.add_header(root);
669            message_builder.finish()
670        };
671
672        fbb.finish(root, None);
673        let finished_data = fbb.finished_data();
674
675        Ok(EncodedData {
676            ipc_message: finished_data.to_vec(),
677            arrow_data,
678        })
679    }
680}
681
682fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
683    match array.data_type() {
684        DataType::BinaryView | DataType::Utf8View => {
685            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
686            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
687            counts.push(array.buffers().len() as i64 - 1);
688        }
689        DataType::Dictionary(_, _) => {
690            // Do nothing
691            // Dictionary types are handled in `encode_dictionaries`.
692        }
693        _ => {
694            for child in array.child_data() {
695                append_variadic_buffer_counts(counts, child)
696            }
697        }
698    }
699}
700
701pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
702    match arr.data_type() {
703        DataType::RunEndEncoded(k, _) => match k.data_type() {
704            DataType::Int16 => {
705                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
706            }
707            DataType::Int32 => {
708                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
709            }
710            DataType::Int64 => {
711                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
712            }
713            d => unreachable!("Unexpected data type {d}"),
714        },
715        d => Err(ArrowError::InvalidArgumentError(format!(
716            "The given array is not a run array. Data type of given array: {d}"
717        ))),
718    }
719}
720
721// Returns a `RunArray` with zero offset and length matching the last value
722// in run_ends array.
723fn into_zero_offset_run_array<R: RunEndIndexType>(
724    run_array: RunArray<R>,
725) -> Result<RunArray<R>, ArrowError> {
726    let run_ends = run_array.run_ends();
727    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
728        return Ok(run_array);
729    }
730
731    // The physical index of original run_ends array from which the `ArrayData`is sliced.
732    let start_physical_index = run_ends.get_start_physical_index();
733
734    // The physical index of original run_ends array until which the `ArrayData`is sliced.
735    let end_physical_index = run_ends.get_end_physical_index();
736
737    let physical_length = end_physical_index - start_physical_index + 1;
738
739    // build new run_ends array by subtracting offset from run ends.
740    let offset = R::Native::usize_as(run_ends.offset());
741    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
742    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
743        builder.append(run_end_value.sub_wrapping(offset));
744    }
745    builder.append(R::Native::from_usize(run_array.len()).unwrap());
746    let new_run_ends = unsafe {
747        // Safety:
748        // The function builds a valid run_ends array and hence need not be validated.
749        ArrayDataBuilder::new(R::DATA_TYPE)
750            .len(physical_length)
751            .add_buffer(builder.finish())
752            .build_unchecked()
753    };
754
755    // build new values by slicing physical indices.
756    let new_values = run_array
757        .values()
758        .slice(start_physical_index, physical_length)
759        .into_data();
760
761    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
762        .len(run_array.len())
763        .add_child_data(new_run_ends)
764        .add_child_data(new_values);
765    let array_data = unsafe {
766        // Safety:
767        //  This function builds a valid run array and hence can skip validation.
768        builder.build_unchecked()
769    };
770    Ok(array_data.into())
771}
772
773/// Controls how dictionaries are handled in Arrow IPC messages
774#[derive(Debug, Clone, Copy, PartialEq, Eq)]
775pub enum DictionaryHandling {
776    /// Send the entire dictionary every time it is encountered (default)
777    Resend,
778    /// Send only new dictionary values since the last batch (delta encoding)
779    ///
780    /// When a dictionary is first encountered, the entire dictionary is sent.
781    /// For subsequent batches, only values that are new (not previously sent)
782    /// are transmitted with the `isDelta` flag set to true.
783    Delta,
784}
785
786impl Default for DictionaryHandling {
787    fn default() -> Self {
788        Self::Resend
789    }
790}
791
792/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
793#[derive(Debug, Clone)]
794pub enum DictionaryUpdate {
795    /// No dictionary was written, the dictionary was identical to what was already
796    /// in the tracker.
797    None,
798    /// No dictionary was present in the tracker
799    New,
800    /// Dictionary was replaced with the new data
801    Replaced,
802    /// Dictionary was updated, ArrayData is the delta between old and new
803    Delta(ArrayData),
804}
805
806/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
807/// multiple times.
808///
809/// Can optionally error if an update to an existing dictionary is attempted, which
810/// isn't allowed in the `FileWriter`.
811#[derive(Debug)]
812pub struct DictionaryTracker {
813    written: HashMap<i64, ArrayData>,
814    dict_ids: Vec<i64>,
815    error_on_replacement: bool,
816}
817
818impl DictionaryTracker {
819    /// Create a new [`DictionaryTracker`].
820    ///
821    /// If `error_on_replacement`
822    /// is true, an error will be generated if an update to an
823    /// existing dictionary is attempted.
824    pub fn new(error_on_replacement: bool) -> Self {
825        #[allow(deprecated)]
826        Self {
827            written: HashMap::new(),
828            dict_ids: Vec::new(),
829            error_on_replacement,
830        }
831    }
832
833    /// Record and return the next dictionary ID.
834    pub fn next_dict_id(&mut self) -> i64 {
835        let next = self
836            .dict_ids
837            .last()
838            .copied()
839            .map(|i| i + 1)
840            .unwrap_or_default();
841
842        self.dict_ids.push(next);
843        next
844    }
845
846    /// Return the sequence of dictionary IDs in the order they should be observed while
847    /// traversing the schema
848    pub fn dict_id(&mut self) -> &[i64] {
849        &self.dict_ids
850    }
851
852    /// Keep track of the dictionary with the given ID and values. Behavior:
853    ///
854    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
855    ///   that the dictionary was not actually inserted (because it's already been seen).
856    /// * If this ID has been written already but with different data, and this tracker is
857    ///   configured to return an error, return an error.
858    /// * If the tracker has not been configured to error on replacement or this dictionary
859    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
860    ///   inserted.
861    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
862    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
863        let dict_data = column.to_data();
864        let dict_values = &dict_data.child_data()[0];
865
866        // If a dictionary with this id was already emitted, check if it was the same.
867        if let Some(last) = self.written.get(&dict_id) {
868            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
869                // Same dictionary values => no need to emit it again
870                return Ok(false);
871            }
872            if self.error_on_replacement {
873                // If error on replacement perform a logical comparison
874                if last.child_data()[0] == *dict_values {
875                    // Same dictionary values => no need to emit it again
876                    return Ok(false);
877                }
878                return Err(ArrowError::InvalidArgumentError(
879                    "Dictionary replacement detected when writing IPC file format. \
880                     Arrow IPC files only support a single dictionary for a given field \
881                     across all batches."
882                        .to_string(),
883                ));
884            }
885        }
886
887        self.written.insert(dict_id, dict_data);
888        Ok(true)
889    }
890
891    /// Keep track of the dictionary with the given ID and values. The return
892    /// value indicates what, if any, update to the internal map took place
893    /// and how it should be interpreted based on the `dict_handling` parameter.
894    ///
895    /// # Returns
896    ///
897    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
898    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
899    ///   with completely different data, or if the data is a delta of the existing,
900    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
901    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
902    ///   the new data is a delta of the old and the `dict_handling` is set to
903    ///   `DictionaryHandling::Delta`
904    /// * `Err(e)` - If the dictionary was previously written with different data,
905    ///   and `error_on_replacement` is set to `true`.
906    pub fn insert_column(
907        &mut self,
908        dict_id: i64,
909        column: &ArrayRef,
910        dict_handling: DictionaryHandling,
911    ) -> Result<DictionaryUpdate, ArrowError> {
912        let new_data = column.to_data();
913        let new_values = &new_data.child_data()[0];
914
915        // If there is no existing dictionary with this ID, we always insert
916        let Some(old) = self.written.get(&dict_id) else {
917            self.written.insert(dict_id, new_data);
918            return Ok(DictionaryUpdate::New);
919        };
920
921        // Fast path - If the array data points to the same buffer as the
922        // existing then they're the same.
923        let old_values = &old.child_data()[0];
924        if ArrayData::ptr_eq(old_values, new_values) {
925            return Ok(DictionaryUpdate::None);
926        }
927
928        // Slow path - Compare the dictionaries value by value
929        let comparison = compare_dictionaries(old_values, new_values);
930        if matches!(comparison, DictionaryComparison::Equal) {
931            return Ok(DictionaryUpdate::None);
932        }
933
934        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
935                 Arrow IPC files only support a single dictionary for a given field \
936                 across all batches.";
937
938        match comparison {
939            DictionaryComparison::NotEqual => {
940                if self.error_on_replacement {
941                    return Err(ArrowError::InvalidArgumentError(
942                        REPLACEMENT_ERROR.to_string(),
943                    ));
944                }
945
946                self.written.insert(dict_id, new_data);
947                Ok(DictionaryUpdate::Replaced)
948            }
949            DictionaryComparison::Delta => match dict_handling {
950                DictionaryHandling::Resend => {
951                    if self.error_on_replacement {
952                        return Err(ArrowError::InvalidArgumentError(
953                            REPLACEMENT_ERROR.to_string(),
954                        ));
955                    }
956
957                    self.written.insert(dict_id, new_data);
958                    Ok(DictionaryUpdate::Replaced)
959                }
960                DictionaryHandling::Delta => {
961                    let delta =
962                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
963                    self.written.insert(dict_id, new_data);
964                    Ok(DictionaryUpdate::Delta(delta))
965                }
966            },
967            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
968        }
969    }
970}
971
972/// Describes how two dictionary arrays compare to each other.
973#[derive(Debug, Clone)]
974enum DictionaryComparison {
975    /// Neither a delta, nor an exact match
976    NotEqual,
977    /// Exact element-wise match
978    Equal,
979    /// The two arrays are dictionary deltas of each other, meaning the first
980    /// is a prefix of the second.
981    Delta,
982}
983
984// Compares two dictionaries and returns a [`DictionaryComparison`].
985fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
986    // Check for exact match
987    let existing_len = old.len();
988    let new_len = new.len();
989    if existing_len == new_len {
990        if *old == *new {
991            return DictionaryComparison::Equal;
992        } else {
993            return DictionaryComparison::NotEqual;
994        }
995    }
996
997    // Can't be a delta if the new is shorter than the existing
998    if new_len < existing_len {
999        return DictionaryComparison::NotEqual;
1000    }
1001
1002    // Check for delta
1003    if new.slice(0, existing_len) == *old {
1004        return DictionaryComparison::Delta;
1005    }
1006
1007    DictionaryComparison::NotEqual
1008}
1009
1010/// Arrow File Writer
1011///
1012/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1013///
1014/// # See Also
1015///
1016/// * [`StreamWriter`] for writing IPC Streams
1017///
1018/// # Example
1019/// ```
1020/// # use arrow_array::record_batch;
1021/// # use arrow_ipc::writer::FileWriter;
1022/// # let mut file = vec![]; // mimic a file for the example
1023/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1024/// // create a new writer, the schema must be known in advance
1025/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1026/// // write each batch to the underlying writer
1027/// writer.write(&batch).unwrap();
1028/// // When all batches are written, call finish to flush all buffers
1029/// writer.finish().unwrap();
1030/// ```
1031/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1032pub struct FileWriter<W> {
1033    /// The object to write to
1034    writer: W,
1035    /// IPC write options
1036    write_options: IpcWriteOptions,
1037    /// A reference to the schema, used in validating record batches
1038    schema: SchemaRef,
1039    /// The number of bytes between each block of bytes, as an offset for random access
1040    block_offsets: usize,
1041    /// Dictionary blocks that will be written as part of the IPC footer
1042    dictionary_blocks: Vec<crate::Block>,
1043    /// Record blocks that will be written as part of the IPC footer
1044    record_blocks: Vec<crate::Block>,
1045    /// Whether the writer footer has been written, and the writer is finished
1046    finished: bool,
1047    /// Keeps track of dictionaries that have been written
1048    dictionary_tracker: DictionaryTracker,
1049    /// User level customized metadata
1050    custom_metadata: HashMap<String, String>,
1051
1052    data_gen: IpcDataGenerator,
1053
1054    compression_context: CompressionContext,
1055}
1056
1057impl<W: Write> FileWriter<BufWriter<W>> {
1058    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1059    ///
1060    /// See [`FileWriter::try_new`] for an unbuffered version.
1061    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1062        Self::try_new(BufWriter::new(writer), schema)
1063    }
1064}
1065
1066impl<W: Write> FileWriter<W> {
1067    /// Try to create a new writer, with the schema written as part of the header
1068    ///
1069    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1070    ///
1071    /// # Errors
1072    ///
1073    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1074    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1075        let write_options = IpcWriteOptions::default();
1076        Self::try_new_with_options(writer, schema, write_options)
1077    }
1078
1079    /// Try to create a new writer with IpcWriteOptions
1080    ///
1081    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1082    ///
1083    /// # Errors
1084    ///
1085    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1086    pub fn try_new_with_options(
1087        mut writer: W,
1088        schema: &Schema,
1089        write_options: IpcWriteOptions,
1090    ) -> Result<Self, ArrowError> {
1091        let data_gen = IpcDataGenerator::default();
1092        // write magic to header aligned on alignment boundary
1093        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1094        let header_size = super::ARROW_MAGIC.len() + pad_len;
1095        writer.write_all(&super::ARROW_MAGIC)?;
1096        writer.write_all(&PADDING[..pad_len])?;
1097        // write the schema, set the written bytes to the schema + header
1098        let mut dictionary_tracker = DictionaryTracker::new(true);
1099        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1100            schema,
1101            &mut dictionary_tracker,
1102            &write_options,
1103        );
1104        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1105        Ok(Self {
1106            writer,
1107            write_options,
1108            schema: Arc::new(schema.clone()),
1109            block_offsets: meta + data + header_size,
1110            dictionary_blocks: vec![],
1111            record_blocks: vec![],
1112            finished: false,
1113            dictionary_tracker,
1114            custom_metadata: HashMap::new(),
1115            data_gen,
1116            compression_context: CompressionContext::default(),
1117        })
1118    }
1119
1120    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1121    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1122        self.custom_metadata.insert(key.into(), value.into());
1123    }
1124
1125    /// Write a record batch to the file
1126    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1127        if self.finished {
1128            return Err(ArrowError::IpcError(
1129                "Cannot write record batch to file writer as it is closed".to_string(),
1130            ));
1131        }
1132
1133        let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
1134            batch,
1135            &mut self.dictionary_tracker,
1136            &self.write_options,
1137            &mut self.compression_context,
1138        )?;
1139
1140        for encoded_dictionary in encoded_dictionaries {
1141            let (meta, data) =
1142                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1143
1144            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1145            self.dictionary_blocks.push(block);
1146            self.block_offsets += meta + data;
1147        }
1148
1149        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1150
1151        // add a record block for the footer
1152        let block = crate::Block::new(
1153            self.block_offsets as i64,
1154            meta as i32, // TODO: is this still applicable?
1155            data as i64,
1156        );
1157        self.record_blocks.push(block);
1158        self.block_offsets += meta + data;
1159        Ok(())
1160    }
1161
1162    /// Write footer and closing tag, then mark the writer as done
1163    pub fn finish(&mut self) -> Result<(), ArrowError> {
1164        if self.finished {
1165            return Err(ArrowError::IpcError(
1166                "Cannot write footer to file writer as it is closed".to_string(),
1167            ));
1168        }
1169
1170        // write EOS
1171        write_continuation(&mut self.writer, &self.write_options, 0)?;
1172
1173        let mut fbb = FlatBufferBuilder::new();
1174        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1175        let record_batches = fbb.create_vector(&self.record_blocks);
1176        let mut dictionary_tracker = DictionaryTracker::new(true);
1177        let schema = IpcSchemaEncoder::new()
1178            .with_dictionary_tracker(&mut dictionary_tracker)
1179            .schema_to_fb_offset(&mut fbb, &self.schema);
1180        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1181            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1182
1183        let root = {
1184            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1185            footer_builder.add_version(self.write_options.metadata_version);
1186            footer_builder.add_schema(schema);
1187            footer_builder.add_dictionaries(dictionaries);
1188            footer_builder.add_recordBatches(record_batches);
1189            if let Some(fb_custom_metadata) = fb_custom_metadata {
1190                footer_builder.add_custom_metadata(fb_custom_metadata);
1191            }
1192            footer_builder.finish()
1193        };
1194        fbb.finish(root, None);
1195        let footer_data = fbb.finished_data();
1196        self.writer.write_all(footer_data)?;
1197        self.writer
1198            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1199        self.writer.write_all(&super::ARROW_MAGIC)?;
1200        self.writer.flush()?;
1201        self.finished = true;
1202
1203        Ok(())
1204    }
1205
1206    /// Returns the arrow [`SchemaRef`] for this arrow file.
1207    pub fn schema(&self) -> &SchemaRef {
1208        &self.schema
1209    }
1210
1211    /// Gets a reference to the underlying writer.
1212    pub fn get_ref(&self) -> &W {
1213        &self.writer
1214    }
1215
1216    /// Gets a mutable reference to the underlying writer.
1217    ///
1218    /// It is inadvisable to directly write to the underlying writer.
1219    pub fn get_mut(&mut self) -> &mut W {
1220        &mut self.writer
1221    }
1222
1223    /// Flush the underlying writer.
1224    ///
1225    /// Both the BufWriter and the underlying writer are flushed.
1226    pub fn flush(&mut self) -> Result<(), ArrowError> {
1227        self.writer.flush()?;
1228        Ok(())
1229    }
1230
1231    /// Unwraps the underlying writer.
1232    ///
1233    /// The writer is flushed and the FileWriter is finished before returning.
1234    ///
1235    /// # Errors
1236    ///
1237    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1238    /// or while flushing the writer.
1239    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1240        if !self.finished {
1241            // `finish` flushes the writer.
1242            self.finish()?;
1243        }
1244        Ok(self.writer)
1245    }
1246}
1247
1248impl<W: Write> RecordBatchWriter for FileWriter<W> {
1249    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1250        self.write(batch)
1251    }
1252
1253    fn close(mut self) -> Result<(), ArrowError> {
1254        self.finish()
1255    }
1256}
1257
1258/// Arrow Stream Writer
1259///
1260/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1261///
1262/// # See Also
1263///
1264/// * [`FileWriter`] for writing IPC Files
1265///
1266/// # Example - Basic usage
1267/// ```
1268/// # use arrow_array::record_batch;
1269/// # use arrow_ipc::writer::StreamWriter;
1270/// # let mut stream = vec![]; // mimic a stream for the example
1271/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1272/// // create a new writer, the schema must be known in advance
1273/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1274/// // write each batch to the underlying stream
1275/// writer.write(&batch).unwrap();
1276/// // When all batches are written, call finish to flush all buffers
1277/// writer.finish().unwrap();
1278/// ```
1279/// # Example - Efficient delta dictionaries
1280/// ```
1281/// # use arrow_array::record_batch;
1282/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1283/// # use arrow_ipc::writer::DictionaryHandling;
1284/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1285/// # use arrow_array::{
1286/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1287/// #    RecordBatch, StringArray,
1288/// # };
1289/// # use std::sync::Arc;
1290///
1291/// let schema = Arc::new(Schema::new(vec![Field::new(
1292///    "col1",
1293///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1294///    true,
1295/// )]));
1296///
1297/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1298///
1299/// // `finish_preserve_values` will keep the dictionary values along with their
1300/// // key assignments so that they can be re-used in the next batch.
1301/// builder.append("a").unwrap();
1302/// builder.append("b").unwrap();
1303/// let array1 = builder.finish_preserve_values();
1304/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1305///
1306/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1307/// // and 'd' will take the next available key.
1308/// builder.append("a").unwrap();
1309/// builder.append("d").unwrap();
1310/// let array2 = builder.finish_preserve_values();
1311/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1312///
1313/// let mut stream = vec![];
1314/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1315/// // enable delta dictionaries in the writer
1316/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1317/// let mut writer = StreamWriter::try_new(&mut stream, &schema).unwrap();
1318///
1319/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1320/// // prior to the record batch.
1321/// writer.write(&batch1).unwrap();
1322/// // With the second batch only a delta dictionary with 'd' will be written
1323/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1324/// // Without it, 'a' and 'd' in this batch would have different keys than the
1325/// // first batch and so we'd have to send a replacement dictionary with new keys
1326/// // for both.
1327/// writer.write(&batch2).unwrap();
1328/// writer.finish().unwrap();
1329/// ```
1330/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1331pub struct StreamWriter<W> {
1332    /// The object to write to
1333    writer: W,
1334    /// IPC write options
1335    write_options: IpcWriteOptions,
1336    /// Whether the writer footer has been written, and the writer is finished
1337    finished: bool,
1338    /// Keeps track of dictionaries that have been written
1339    dictionary_tracker: DictionaryTracker,
1340
1341    data_gen: IpcDataGenerator,
1342
1343    compression_context: CompressionContext,
1344}
1345
1346impl<W: Write> StreamWriter<BufWriter<W>> {
1347    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1348    ///
1349    /// See [`StreamWriter::try_new`] for an unbuffered version.
1350    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1351        Self::try_new(BufWriter::new(writer), schema)
1352    }
1353}
1354
1355impl<W: Write> StreamWriter<W> {
1356    /// Try to create a new writer, with the schema written as part of the header.
1357    ///
1358    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1359    ///
1360    /// # Errors
1361    ///
1362    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1363    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1364        let write_options = IpcWriteOptions::default();
1365        Self::try_new_with_options(writer, schema, write_options)
1366    }
1367
1368    /// Try to create a new writer with [`IpcWriteOptions`].
1369    ///
1370    /// # Errors
1371    ///
1372    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1373    pub fn try_new_with_options(
1374        mut writer: W,
1375        schema: &Schema,
1376        write_options: IpcWriteOptions,
1377    ) -> Result<Self, ArrowError> {
1378        let data_gen = IpcDataGenerator::default();
1379        let mut dictionary_tracker = DictionaryTracker::new(false);
1380
1381        // write the schema, set the written bytes to the schema
1382        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1383            schema,
1384            &mut dictionary_tracker,
1385            &write_options,
1386        );
1387        write_message(&mut writer, encoded_message, &write_options)?;
1388        Ok(Self {
1389            writer,
1390            write_options,
1391            finished: false,
1392            dictionary_tracker,
1393            data_gen,
1394            compression_context: CompressionContext::default(),
1395        })
1396    }
1397
1398    /// Write a record batch to the stream
1399    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1400        if self.finished {
1401            return Err(ArrowError::IpcError(
1402                "Cannot write record batch to stream writer as it is closed".to_string(),
1403            ));
1404        }
1405
1406        let (encoded_dictionaries, encoded_message) = self
1407            .data_gen
1408            .encode(
1409                batch,
1410                &mut self.dictionary_tracker,
1411                &self.write_options,
1412                &mut self.compression_context,
1413            )
1414            .expect("StreamWriter is configured to not error on dictionary replacement");
1415
1416        for encoded_dictionary in encoded_dictionaries {
1417            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1418        }
1419
1420        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1421        Ok(())
1422    }
1423
1424    /// Write continuation bytes, and mark the stream as done
1425    pub fn finish(&mut self) -> Result<(), ArrowError> {
1426        if self.finished {
1427            return Err(ArrowError::IpcError(
1428                "Cannot write footer to stream writer as it is closed".to_string(),
1429            ));
1430        }
1431
1432        write_continuation(&mut self.writer, &self.write_options, 0)?;
1433
1434        self.finished = true;
1435
1436        Ok(())
1437    }
1438
1439    /// Gets a reference to the underlying writer.
1440    pub fn get_ref(&self) -> &W {
1441        &self.writer
1442    }
1443
1444    /// Gets a mutable reference to the underlying writer.
1445    ///
1446    /// It is inadvisable to directly write to the underlying writer.
1447    pub fn get_mut(&mut self) -> &mut W {
1448        &mut self.writer
1449    }
1450
1451    /// Flush the underlying writer.
1452    ///
1453    /// Both the BufWriter and the underlying writer are flushed.
1454    pub fn flush(&mut self) -> Result<(), ArrowError> {
1455        self.writer.flush()?;
1456        Ok(())
1457    }
1458
1459    /// Unwraps the the underlying writer.
1460    ///
1461    /// The writer is flushed and the StreamWriter is finished before returning.
1462    ///
1463    /// # Errors
1464    ///
1465    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1466    /// or while flushing the writer.
1467    ///
1468    /// # Example
1469    ///
1470    /// ```
1471    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1472    /// # use arrow_ipc::MetadataVersion;
1473    /// # use arrow_schema::{ArrowError, Schema};
1474    /// # fn main() -> Result<(), ArrowError> {
1475    /// // The result we expect from an empty schema
1476    /// let expected = vec![
1477    ///     255, 255, 255, 255,  48,   0,   0,   0,
1478    ///      16,   0,   0,   0,   0,   0,  10,   0,
1479    ///      12,   0,  10,   0,   9,   0,   4,   0,
1480    ///      10,   0,   0,   0,  16,   0,   0,   0,
1481    ///       0,   1,   4,   0,   8,   0,   8,   0,
1482    ///       0,   0,   4,   0,   8,   0,   0,   0,
1483    ///       4,   0,   0,   0,   0,   0,   0,   0,
1484    ///     255, 255, 255, 255,   0,   0,   0,   0
1485    /// ];
1486    ///
1487    /// let schema = Schema::empty();
1488    /// let buffer: Vec<u8> = Vec::new();
1489    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1490    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1491    ///
1492    /// assert_eq!(stream_writer.into_inner()?, expected);
1493    /// # Ok(())
1494    /// # }
1495    /// ```
1496    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1497        if !self.finished {
1498            // `finish` flushes.
1499            self.finish()?;
1500        }
1501        Ok(self.writer)
1502    }
1503}
1504
1505impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1506    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1507        self.write(batch)
1508    }
1509
1510    fn close(mut self) -> Result<(), ArrowError> {
1511        self.finish()
1512    }
1513}
1514
1515/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1516pub struct EncodedData {
1517    /// An encoded crate::Message
1518    pub ipc_message: Vec<u8>,
1519    /// Arrow buffers to be written, should be an empty vec for schema messages
1520    pub arrow_data: Vec<u8>,
1521}
1522/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1523pub fn write_message<W: Write>(
1524    mut writer: W,
1525    encoded: EncodedData,
1526    write_options: &IpcWriteOptions,
1527) -> Result<(usize, usize), ArrowError> {
1528    let arrow_data_len = encoded.arrow_data.len();
1529    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1530        return Err(ArrowError::MemoryError(
1531            "Arrow data not aligned".to_string(),
1532        ));
1533    }
1534
1535    let a = usize::from(write_options.alignment - 1);
1536    let buffer = encoded.ipc_message;
1537    let flatbuf_size = buffer.len();
1538    let prefix_size = if write_options.write_legacy_ipc_format {
1539        4
1540    } else {
1541        8
1542    };
1543    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1544    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1545
1546    write_continuation(
1547        &mut writer,
1548        write_options,
1549        (aligned_size - prefix_size) as i32,
1550    )?;
1551
1552    // write the flatbuf
1553    if flatbuf_size > 0 {
1554        writer.write_all(&buffer)?;
1555    }
1556    // write padding
1557    writer.write_all(&PADDING[..padding_bytes])?;
1558
1559    // write arrow data
1560    let body_len = if arrow_data_len > 0 {
1561        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1562    } else {
1563        0
1564    };
1565
1566    Ok((aligned_size, body_len))
1567}
1568
1569fn write_body_buffers<W: Write>(
1570    mut writer: W,
1571    data: &[u8],
1572    alignment: u8,
1573) -> Result<usize, ArrowError> {
1574    let len = data.len();
1575    let pad_len = pad_to_alignment(alignment, len);
1576    let total_len = len + pad_len;
1577
1578    // write body buffer
1579    writer.write_all(data)?;
1580    if pad_len > 0 {
1581        writer.write_all(&PADDING[..pad_len])?;
1582    }
1583
1584    writer.flush()?;
1585    Ok(total_len)
1586}
1587
1588/// Write a record batch to the writer, writing the message size before the message
1589/// if the record batch is being written to a stream
1590fn write_continuation<W: Write>(
1591    mut writer: W,
1592    write_options: &IpcWriteOptions,
1593    total_len: i32,
1594) -> Result<usize, ArrowError> {
1595    let mut written = 8;
1596
1597    // the version of the writer determines whether continuation markers should be added
1598    match write_options.metadata_version {
1599        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1600            unreachable!("Options with the metadata version cannot be created")
1601        }
1602        crate::MetadataVersion::V4 => {
1603            if !write_options.write_legacy_ipc_format {
1604                // v0.15.0 format
1605                writer.write_all(&CONTINUATION_MARKER)?;
1606                written = 4;
1607            }
1608            writer.write_all(&total_len.to_le_bytes()[..])?;
1609        }
1610        crate::MetadataVersion::V5 => {
1611            // write continuation marker and message length
1612            writer.write_all(&CONTINUATION_MARKER)?;
1613            writer.write_all(&total_len.to_le_bytes()[..])?;
1614        }
1615        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1616    };
1617
1618    writer.flush()?;
1619
1620    Ok(written)
1621}
1622
1623/// In V4, null types have no validity bitmap
1624/// In V5 and later, null and union types have no validity bitmap
1625/// Run end encoded type has no validity bitmap.
1626fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1627    if write_options.metadata_version < crate::MetadataVersion::V5 {
1628        !matches!(data_type, DataType::Null)
1629    } else {
1630        !matches!(
1631            data_type,
1632            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1633        )
1634    }
1635}
1636
1637/// Whether to truncate the buffer
1638#[inline]
1639fn buffer_need_truncate(
1640    array_offset: usize,
1641    buffer: &Buffer,
1642    spec: &BufferSpec,
1643    min_length: usize,
1644) -> bool {
1645    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1646}
1647
1648/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1649#[inline]
1650fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1651    match spec {
1652        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1653        _ => 0,
1654    }
1655}
1656
1657/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1658/// original start offset and length for use in slicing child data.
1659fn reencode_offsets<O: OffsetSizeTrait>(
1660    offsets: &Buffer,
1661    data: &ArrayData,
1662) -> (Buffer, usize, usize) {
1663    let offsets_slice: &[O] = offsets.typed_data::<O>();
1664    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1665
1666    let start_offset = offset_slice.first().unwrap();
1667    let end_offset = offset_slice.last().unwrap();
1668
1669    let offsets = match start_offset.as_usize() {
1670        0 => {
1671            let size = size_of::<O>();
1672            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1673        }
1674        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1675    };
1676
1677    let start_offset = start_offset.as_usize();
1678    let end_offset = end_offset.as_usize();
1679
1680    (offsets, start_offset, end_offset - start_offset)
1681}
1682
1683/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1684///
1685/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1686/// slicing the values buffer as appropriate. This helps reduce the encoded
1687/// size of sliced arrays, as values that have been sliced away are not encoded
1688fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1689    if data.is_empty() {
1690        return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1691    }
1692
1693    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1694    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1695    (offsets, values)
1696}
1697
1698/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1699/// of a values buffer.
1700fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1701    if data.is_empty() {
1702        return (
1703            MutableBuffer::new(0).into(),
1704            data.child_data()[0].slice(0, 0),
1705        );
1706    }
1707
1708    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1709    let child_data = data.child_data()[0].slice(original_start_offset, len);
1710    (offsets, child_data)
1711}
1712
1713/// Write array data to a vector of bytes
1714#[allow(clippy::too_many_arguments)]
1715fn write_array_data(
1716    array_data: &ArrayData,
1717    buffers: &mut Vec<crate::Buffer>,
1718    arrow_data: &mut Vec<u8>,
1719    nodes: &mut Vec<crate::FieldNode>,
1720    offset: i64,
1721    num_rows: usize,
1722    null_count: usize,
1723    compression_codec: Option<CompressionCodec>,
1724    compression_context: &mut CompressionContext,
1725    write_options: &IpcWriteOptions,
1726) -> Result<i64, ArrowError> {
1727    let mut offset = offset;
1728    if !matches!(array_data.data_type(), DataType::Null) {
1729        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1730    } else {
1731        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1732        // where null_count is always 0.
1733        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1734    }
1735    if has_validity_bitmap(array_data.data_type(), write_options) {
1736        // write null buffer if exists
1737        let null_buffer = match array_data.nulls() {
1738            None => {
1739                // create a buffer and fill it with valid bits
1740                let num_bytes = bit_util::ceil(num_rows, 8);
1741                let buffer = MutableBuffer::new(num_bytes);
1742                let buffer = buffer.with_bitset(num_bytes, true);
1743                buffer.into()
1744            }
1745            Some(buffer) => buffer.inner().sliced(),
1746        };
1747
1748        offset = write_buffer(
1749            null_buffer.as_slice(),
1750            buffers,
1751            arrow_data,
1752            offset,
1753            compression_codec,
1754            compression_context,
1755            write_options.alignment,
1756        )?;
1757    }
1758
1759    let data_type = array_data.data_type();
1760    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1761        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1762        for buffer in [offsets, values] {
1763            offset = write_buffer(
1764                buffer.as_slice(),
1765                buffers,
1766                arrow_data,
1767                offset,
1768                compression_codec,
1769                compression_context,
1770                write_options.alignment,
1771            )?;
1772        }
1773    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1774        // Slicing the views buffer is safe and easy,
1775        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1776        //
1777        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1778        // If users wants to "compact" the arrays prior to sending them over IPC,
1779        // they should consider the gc API suggested in #5513
1780        for buffer in array_data.buffers() {
1781            offset = write_buffer(
1782                buffer.as_slice(),
1783                buffers,
1784                arrow_data,
1785                offset,
1786                compression_codec,
1787                compression_context,
1788                write_options.alignment,
1789            )?;
1790        }
1791    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1792        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1793        for buffer in [offsets, values] {
1794            offset = write_buffer(
1795                buffer.as_slice(),
1796                buffers,
1797                arrow_data,
1798                offset,
1799                compression_codec,
1800                compression_context,
1801                write_options.alignment,
1802            )?;
1803        }
1804    } else if DataType::is_numeric(data_type)
1805        || DataType::is_temporal(data_type)
1806        || matches!(
1807            array_data.data_type(),
1808            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1809        )
1810    {
1811        // Truncate values
1812        assert_eq!(array_data.buffers().len(), 1);
1813
1814        let buffer = &array_data.buffers()[0];
1815        let layout = layout(data_type);
1816        let spec = &layout.buffers[0];
1817
1818        let byte_width = get_buffer_element_width(spec);
1819        let min_length = array_data.len() * byte_width;
1820        let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1821            let byte_offset = array_data.offset() * byte_width;
1822            let buffer_length = min(min_length, buffer.len() - byte_offset);
1823            &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1824        } else {
1825            buffer.as_slice()
1826        };
1827        offset = write_buffer(
1828            buffer_slice,
1829            buffers,
1830            arrow_data,
1831            offset,
1832            compression_codec,
1833            compression_context,
1834            write_options.alignment,
1835        )?;
1836    } else if matches!(data_type, DataType::Boolean) {
1837        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1838        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1839        assert_eq!(array_data.buffers().len(), 1);
1840
1841        let buffer = &array_data.buffers()[0];
1842        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1843        offset = write_buffer(
1844            &buffer,
1845            buffers,
1846            arrow_data,
1847            offset,
1848            compression_codec,
1849            compression_context,
1850            write_options.alignment,
1851        )?;
1852    } else if matches!(
1853        data_type,
1854        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1855    ) {
1856        assert_eq!(array_data.buffers().len(), 1);
1857        assert_eq!(array_data.child_data().len(), 1);
1858
1859        // Truncate offsets and the child data to avoid writing unnecessary data
1860        let (offsets, sliced_child_data) = match data_type {
1861            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1862            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1863            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1864            _ => unreachable!(),
1865        };
1866        offset = write_buffer(
1867            offsets.as_slice(),
1868            buffers,
1869            arrow_data,
1870            offset,
1871            compression_codec,
1872            compression_context,
1873            write_options.alignment,
1874        )?;
1875        offset = write_array_data(
1876            &sliced_child_data,
1877            buffers,
1878            arrow_data,
1879            nodes,
1880            offset,
1881            sliced_child_data.len(),
1882            sliced_child_data.null_count(),
1883            compression_codec,
1884            compression_context,
1885            write_options,
1886        )?;
1887        return Ok(offset);
1888    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1889        assert_eq!(array_data.child_data().len(), 1);
1890        let fixed_size = *fixed_size as usize;
1891
1892        let child_offset = array_data.offset() * fixed_size;
1893        let child_length = array_data.len() * fixed_size;
1894        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1895
1896        offset = write_array_data(
1897            &child_data,
1898            buffers,
1899            arrow_data,
1900            nodes,
1901            offset,
1902            child_data.len(),
1903            child_data.null_count(),
1904            compression_codec,
1905            compression_context,
1906            write_options,
1907        )?;
1908        return Ok(offset);
1909    } else {
1910        for buffer in array_data.buffers() {
1911            offset = write_buffer(
1912                buffer,
1913                buffers,
1914                arrow_data,
1915                offset,
1916                compression_codec,
1917                compression_context,
1918                write_options.alignment,
1919            )?;
1920        }
1921    }
1922
1923    match array_data.data_type() {
1924        DataType::Dictionary(_, _) => {}
1925        DataType::RunEndEncoded(_, _) => {
1926            // unslice the run encoded array.
1927            let arr = unslice_run_array(array_data.clone())?;
1928            // recursively write out nested structures
1929            for data_ref in arr.child_data() {
1930                // write the nested data (e.g list data)
1931                offset = write_array_data(
1932                    data_ref,
1933                    buffers,
1934                    arrow_data,
1935                    nodes,
1936                    offset,
1937                    data_ref.len(),
1938                    data_ref.null_count(),
1939                    compression_codec,
1940                    compression_context,
1941                    write_options,
1942                )?;
1943            }
1944        }
1945        _ => {
1946            // recursively write out nested structures
1947            for data_ref in array_data.child_data() {
1948                // write the nested data (e.g list data)
1949                offset = write_array_data(
1950                    data_ref,
1951                    buffers,
1952                    arrow_data,
1953                    nodes,
1954                    offset,
1955                    data_ref.len(),
1956                    data_ref.null_count(),
1957                    compression_codec,
1958                    compression_context,
1959                    write_options,
1960                )?;
1961            }
1962        }
1963    }
1964    Ok(offset)
1965}
1966
1967/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
1968/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
1969///
1970///
1971/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
1972/// Each constituent buffer is first compressed with the indicated
1973/// compressor, and then written with the uncompressed length in the first 8
1974/// bytes as a 64-bit little-endian signed integer followed by the compressed
1975/// buffer bytes (and then padding as required by the protocol). The
1976/// uncompressed length may be set to -1 to indicate that the data that
1977/// follows is not compressed, which can be useful for cases where
1978/// compression does not yield appreciable savings.
1979fn write_buffer(
1980    buffer: &[u8],                    // input
1981    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
1982    arrow_data: &mut Vec<u8>,         // output stream
1983    offset: i64,                      // current output stream offset
1984    compression_codec: Option<CompressionCodec>,
1985    compression_context: &mut CompressionContext,
1986    alignment: u8,
1987) -> Result<i64, ArrowError> {
1988    let len: i64 = match compression_codec {
1989        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?,
1990        None => {
1991            arrow_data.extend_from_slice(buffer);
1992            buffer.len()
1993        }
1994    }
1995    .try_into()
1996    .map_err(|e| {
1997        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1998    })?;
1999
2000    // make new index entry
2001    buffers.push(crate::Buffer::new(offset, len));
2002    // padding and make offset aligned
2003    let pad_len = pad_to_alignment(alignment, len as usize);
2004    arrow_data.extend_from_slice(&PADDING[..pad_len]);
2005
2006    Ok(offset + len + (pad_len as i64))
2007}
2008
2009const PADDING: [u8; 64] = [0; 64];
2010
2011/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2012#[inline]
2013fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2014    let a = usize::from(alignment - 1);
2015    ((len + a) & !a) - len
2016}
2017
2018#[cfg(test)]
2019mod tests {
2020    use std::hash::Hasher;
2021    use std::io::Cursor;
2022    use std::io::Seek;
2023
2024    use arrow_array::builder::FixedSizeListBuilder;
2025    use arrow_array::builder::Float32Builder;
2026    use arrow_array::builder::Int64Builder;
2027    use arrow_array::builder::MapBuilder;
2028    use arrow_array::builder::UnionBuilder;
2029    use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
2030    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2031    use arrow_array::types::*;
2032    use arrow_buffer::ScalarBuffer;
2033
2034    use crate::MetadataVersion;
2035    use crate::convert::fb_to_schema;
2036    use crate::reader::*;
2037    use crate::root_as_footer;
2038
2039    use super::*;
2040
2041    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2042        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2043        writer.write(rb).unwrap();
2044        writer.finish().unwrap();
2045        writer.into_inner().unwrap()
2046    }
2047
2048    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2049        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2050        reader.next().unwrap().unwrap()
2051    }
2052
2053    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2054        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2055        // without needing to construct a giant array to spill over the 64-byte default alignment
2056        // boundary.
2057        const IPC_ALIGNMENT: usize = 8;
2058
2059        let mut stream_writer = StreamWriter::try_new_with_options(
2060            vec![],
2061            record.schema_ref(),
2062            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2063        )
2064        .unwrap();
2065        stream_writer.write(record).unwrap();
2066        stream_writer.finish().unwrap();
2067        stream_writer.into_inner().unwrap()
2068    }
2069
2070    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2071        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2072        stream_reader.next().unwrap().unwrap()
2073    }
2074
2075    #[test]
2076    #[cfg(feature = "lz4")]
2077    fn test_write_empty_record_batch_lz4_compression() {
2078        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2079        let values: Vec<Option<i32>> = vec![];
2080        let array = Int32Array::from(values);
2081        let record_batch =
2082            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2083
2084        let mut file = tempfile::tempfile().unwrap();
2085
2086        {
2087            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2088                .unwrap()
2089                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2090                .unwrap();
2091
2092            let mut writer =
2093                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2094            writer.write(&record_batch).unwrap();
2095            writer.finish().unwrap();
2096        }
2097        file.rewind().unwrap();
2098        {
2099            // read file
2100            let reader = FileReader::try_new(file, None).unwrap();
2101            for read_batch in reader {
2102                read_batch
2103                    .unwrap()
2104                    .columns()
2105                    .iter()
2106                    .zip(record_batch.columns())
2107                    .for_each(|(a, b)| {
2108                        assert_eq!(a.data_type(), b.data_type());
2109                        assert_eq!(a.len(), b.len());
2110                        assert_eq!(a.null_count(), b.null_count());
2111                    });
2112            }
2113        }
2114    }
2115
2116    #[test]
2117    #[cfg(feature = "lz4")]
2118    fn test_write_file_with_lz4_compression() {
2119        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2120        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2121        let array = Int32Array::from(values);
2122        let record_batch =
2123            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2124
2125        let mut file = tempfile::tempfile().unwrap();
2126        {
2127            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2128                .unwrap()
2129                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2130                .unwrap();
2131
2132            let mut writer =
2133                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2134            writer.write(&record_batch).unwrap();
2135            writer.finish().unwrap();
2136        }
2137        file.rewind().unwrap();
2138        {
2139            // read file
2140            let reader = FileReader::try_new(file, None).unwrap();
2141            for read_batch in reader {
2142                read_batch
2143                    .unwrap()
2144                    .columns()
2145                    .iter()
2146                    .zip(record_batch.columns())
2147                    .for_each(|(a, b)| {
2148                        assert_eq!(a.data_type(), b.data_type());
2149                        assert_eq!(a.len(), b.len());
2150                        assert_eq!(a.null_count(), b.null_count());
2151                    });
2152            }
2153        }
2154    }
2155
2156    #[test]
2157    #[cfg(feature = "zstd")]
2158    fn test_write_file_with_zstd_compression() {
2159        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2160        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2161        let array = Int32Array::from(values);
2162        let record_batch =
2163            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2164        let mut file = tempfile::tempfile().unwrap();
2165        {
2166            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2167                .unwrap()
2168                .try_with_compression(Some(crate::CompressionType::ZSTD))
2169                .unwrap();
2170
2171            let mut writer =
2172                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2173            writer.write(&record_batch).unwrap();
2174            writer.finish().unwrap();
2175        }
2176        file.rewind().unwrap();
2177        {
2178            // read file
2179            let reader = FileReader::try_new(file, None).unwrap();
2180            for read_batch in reader {
2181                read_batch
2182                    .unwrap()
2183                    .columns()
2184                    .iter()
2185                    .zip(record_batch.columns())
2186                    .for_each(|(a, b)| {
2187                        assert_eq!(a.data_type(), b.data_type());
2188                        assert_eq!(a.len(), b.len());
2189                        assert_eq!(a.null_count(), b.null_count());
2190                    });
2191            }
2192        }
2193    }
2194
2195    #[test]
2196    fn test_write_file() {
2197        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2198        let values: Vec<Option<u32>> = vec![
2199            Some(999),
2200            None,
2201            Some(235),
2202            Some(123),
2203            None,
2204            None,
2205            None,
2206            None,
2207            None,
2208        ];
2209        let array1 = UInt32Array::from(values);
2210        let batch =
2211            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2212                .unwrap();
2213        let mut file = tempfile::tempfile().unwrap();
2214        {
2215            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2216
2217            writer.write(&batch).unwrap();
2218            writer.finish().unwrap();
2219        }
2220        file.rewind().unwrap();
2221
2222        {
2223            let mut reader = FileReader::try_new(file, None).unwrap();
2224            while let Some(Ok(read_batch)) = reader.next() {
2225                read_batch
2226                    .columns()
2227                    .iter()
2228                    .zip(batch.columns())
2229                    .for_each(|(a, b)| {
2230                        assert_eq!(a.data_type(), b.data_type());
2231                        assert_eq!(a.len(), b.len());
2232                        assert_eq!(a.null_count(), b.null_count());
2233                    });
2234            }
2235        }
2236    }
2237
2238    fn write_null_file(options: IpcWriteOptions) {
2239        let schema = Schema::new(vec![
2240            Field::new("nulls", DataType::Null, true),
2241            Field::new("int32s", DataType::Int32, false),
2242            Field::new("nulls2", DataType::Null, true),
2243            Field::new("f64s", DataType::Float64, false),
2244        ]);
2245        let array1 = NullArray::new(32);
2246        let array2 = Int32Array::from(vec![1; 32]);
2247        let array3 = NullArray::new(32);
2248        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2249        let batch = RecordBatch::try_new(
2250            Arc::new(schema.clone()),
2251            vec![
2252                Arc::new(array1) as ArrayRef,
2253                Arc::new(array2) as ArrayRef,
2254                Arc::new(array3) as ArrayRef,
2255                Arc::new(array4) as ArrayRef,
2256            ],
2257        )
2258        .unwrap();
2259        let mut file = tempfile::tempfile().unwrap();
2260        {
2261            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2262
2263            writer.write(&batch).unwrap();
2264            writer.finish().unwrap();
2265        }
2266
2267        file.rewind().unwrap();
2268
2269        {
2270            let reader = FileReader::try_new(file, None).unwrap();
2271            reader.for_each(|maybe_batch| {
2272                maybe_batch
2273                    .unwrap()
2274                    .columns()
2275                    .iter()
2276                    .zip(batch.columns())
2277                    .for_each(|(a, b)| {
2278                        assert_eq!(a.data_type(), b.data_type());
2279                        assert_eq!(a.len(), b.len());
2280                        assert_eq!(a.null_count(), b.null_count());
2281                    });
2282            });
2283        }
2284    }
2285    #[test]
2286    fn test_write_null_file_v4() {
2287        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2288        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2289        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2290        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2291    }
2292
2293    #[test]
2294    fn test_write_null_file_v5() {
2295        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2296        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2297    }
2298
2299    #[test]
2300    fn track_union_nested_dict() {
2301        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2302
2303        let array = Arc::new(inner) as ArrayRef;
2304
2305        // Dict field with id 2
2306        #[allow(deprecated)]
2307        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2308        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2309
2310        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2311        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2312
2313        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2314
2315        let schema = Arc::new(Schema::new(vec![Field::new(
2316            "union",
2317            union.data_type().clone(),
2318            false,
2319        )]));
2320
2321        let r#gen = IpcDataGenerator::default();
2322        let mut dict_tracker = DictionaryTracker::new(false);
2323        r#gen.schema_to_bytes_with_dictionary_tracker(
2324            &schema,
2325            &mut dict_tracker,
2326            &IpcWriteOptions::default(),
2327        );
2328
2329        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2330
2331        r#gen
2332            .encode(
2333                &batch,
2334                &mut dict_tracker,
2335                &Default::default(),
2336                &mut Default::default(),
2337            )
2338            .unwrap();
2339
2340        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2341        // so we expect the dict will be keyed to 0
2342        assert!(dict_tracker.written.contains_key(&0));
2343    }
2344
2345    #[test]
2346    fn track_struct_nested_dict() {
2347        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2348
2349        let array = Arc::new(inner) as ArrayRef;
2350
2351        // Dict field with id 2
2352        #[allow(deprecated)]
2353        let dctfield = Arc::new(Field::new_dict(
2354            "dict",
2355            array.data_type().clone(),
2356            false,
2357            2,
2358            false,
2359        ));
2360
2361        let s = StructArray::from(vec![(dctfield, array)]);
2362        let struct_array = Arc::new(s) as ArrayRef;
2363
2364        let schema = Arc::new(Schema::new(vec![Field::new(
2365            "struct",
2366            struct_array.data_type().clone(),
2367            false,
2368        )]));
2369
2370        let r#gen = IpcDataGenerator::default();
2371        let mut dict_tracker = DictionaryTracker::new(false);
2372        r#gen.schema_to_bytes_with_dictionary_tracker(
2373            &schema,
2374            &mut dict_tracker,
2375            &IpcWriteOptions::default(),
2376        );
2377
2378        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2379
2380        r#gen
2381            .encode(
2382                &batch,
2383                &mut dict_tracker,
2384                &Default::default(),
2385                &mut Default::default(),
2386            )
2387            .unwrap();
2388
2389        assert!(dict_tracker.written.contains_key(&0));
2390    }
2391
2392    fn write_union_file(options: IpcWriteOptions) {
2393        let schema = Schema::new(vec![Field::new_union(
2394            "union",
2395            vec![0, 1],
2396            vec![
2397                Field::new("a", DataType::Int32, false),
2398                Field::new("c", DataType::Float64, false),
2399            ],
2400            UnionMode::Sparse,
2401        )]);
2402        let mut builder = UnionBuilder::with_capacity_sparse(5);
2403        builder.append::<Int32Type>("a", 1).unwrap();
2404        builder.append_null::<Int32Type>("a").unwrap();
2405        builder.append::<Float64Type>("c", 3.0).unwrap();
2406        builder.append_null::<Float64Type>("c").unwrap();
2407        builder.append::<Int32Type>("a", 4).unwrap();
2408        let union = builder.build().unwrap();
2409
2410        let batch =
2411            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2412                .unwrap();
2413
2414        let mut file = tempfile::tempfile().unwrap();
2415        {
2416            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2417
2418            writer.write(&batch).unwrap();
2419            writer.finish().unwrap();
2420        }
2421        file.rewind().unwrap();
2422
2423        {
2424            let reader = FileReader::try_new(file, None).unwrap();
2425            reader.for_each(|maybe_batch| {
2426                maybe_batch
2427                    .unwrap()
2428                    .columns()
2429                    .iter()
2430                    .zip(batch.columns())
2431                    .for_each(|(a, b)| {
2432                        assert_eq!(a.data_type(), b.data_type());
2433                        assert_eq!(a.len(), b.len());
2434                        assert_eq!(a.null_count(), b.null_count());
2435                    });
2436            });
2437        }
2438    }
2439
2440    #[test]
2441    fn test_write_union_file_v4_v5() {
2442        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2443        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2444    }
2445
2446    #[test]
2447    fn test_write_view_types() {
2448        const LONG_TEST_STRING: &str =
2449            "This is a long string to make sure binary view array handles it";
2450        let schema = Schema::new(vec![
2451            Field::new("field1", DataType::BinaryView, true),
2452            Field::new("field2", DataType::Utf8View, true),
2453        ]);
2454        let values: Vec<Option<&[u8]>> = vec![
2455            Some(b"foo"),
2456            Some(b"bar"),
2457            Some(LONG_TEST_STRING.as_bytes()),
2458        ];
2459        let binary_array = BinaryViewArray::from_iter(values);
2460        let utf8_array =
2461            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2462        let record_batch = RecordBatch::try_new(
2463            Arc::new(schema.clone()),
2464            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2465        )
2466        .unwrap();
2467
2468        let mut file = tempfile::tempfile().unwrap();
2469        {
2470            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2471            writer.write(&record_batch).unwrap();
2472            writer.finish().unwrap();
2473        }
2474        file.rewind().unwrap();
2475        {
2476            let mut reader = FileReader::try_new(&file, None).unwrap();
2477            let read_batch = reader.next().unwrap().unwrap();
2478            read_batch
2479                .columns()
2480                .iter()
2481                .zip(record_batch.columns())
2482                .for_each(|(a, b)| {
2483                    assert_eq!(a, b);
2484                });
2485        }
2486        file.rewind().unwrap();
2487        {
2488            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2489            let read_batch = reader.next().unwrap().unwrap();
2490            assert_eq!(read_batch.num_columns(), 1);
2491            let read_array = read_batch.column(0);
2492            let write_array = record_batch.column(0);
2493            assert_eq!(read_array, write_array);
2494        }
2495    }
2496
2497    #[test]
2498    fn truncate_ipc_record_batch() {
2499        fn create_batch(rows: usize) -> RecordBatch {
2500            let schema = Schema::new(vec![
2501                Field::new("a", DataType::Int32, false),
2502                Field::new("b", DataType::Utf8, false),
2503            ]);
2504
2505            let a = Int32Array::from_iter_values(0..rows as i32);
2506            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2507
2508            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2509        }
2510
2511        let big_record_batch = create_batch(65536);
2512
2513        let length = 5;
2514        let small_record_batch = create_batch(length);
2515
2516        let offset = 2;
2517        let record_batch_slice = big_record_batch.slice(offset, length);
2518        assert!(
2519            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2520        );
2521        assert_eq!(
2522            serialize_stream(&small_record_batch).len(),
2523            serialize_stream(&record_batch_slice).len()
2524        );
2525
2526        assert_eq!(
2527            deserialize_stream(serialize_stream(&record_batch_slice)),
2528            record_batch_slice
2529        );
2530    }
2531
2532    #[test]
2533    fn truncate_ipc_record_batch_with_nulls() {
2534        fn create_batch() -> RecordBatch {
2535            let schema = Schema::new(vec![
2536                Field::new("a", DataType::Int32, true),
2537                Field::new("b", DataType::Utf8, true),
2538            ]);
2539
2540            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2541            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2542
2543            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2544        }
2545
2546        let record_batch = create_batch();
2547        let record_batch_slice = record_batch.slice(1, 2);
2548        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2549
2550        assert!(
2551            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2552        );
2553
2554        assert!(deserialized_batch.column(0).is_null(0));
2555        assert!(deserialized_batch.column(0).is_valid(1));
2556        assert!(deserialized_batch.column(1).is_valid(0));
2557        assert!(deserialized_batch.column(1).is_valid(1));
2558
2559        assert_eq!(record_batch_slice, deserialized_batch);
2560    }
2561
2562    #[test]
2563    fn truncate_ipc_dictionary_array() {
2564        fn create_batch() -> RecordBatch {
2565            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2566                .into_iter()
2567                .collect();
2568            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2569
2570            let array = DictionaryArray::new(keys, Arc::new(values));
2571
2572            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2573
2574            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2575        }
2576
2577        let record_batch = create_batch();
2578        let record_batch_slice = record_batch.slice(1, 2);
2579        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2580
2581        assert!(
2582            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2583        );
2584
2585        assert!(deserialized_batch.column(0).is_valid(0));
2586        assert!(deserialized_batch.column(0).is_null(1));
2587
2588        assert_eq!(record_batch_slice, deserialized_batch);
2589    }
2590
2591    #[test]
2592    fn truncate_ipc_struct_array() {
2593        fn create_batch() -> RecordBatch {
2594            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2595                .into_iter()
2596                .collect();
2597            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2598
2599            let struct_array = StructArray::from(vec![
2600                (
2601                    Arc::new(Field::new("s", DataType::Utf8, true)),
2602                    Arc::new(strings) as ArrayRef,
2603                ),
2604                (
2605                    Arc::new(Field::new("c", DataType::Int32, true)),
2606                    Arc::new(ints) as ArrayRef,
2607                ),
2608            ]);
2609
2610            let schema = Schema::new(vec![Field::new(
2611                "struct_array",
2612                struct_array.data_type().clone(),
2613                true,
2614            )]);
2615
2616            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2617        }
2618
2619        let record_batch = create_batch();
2620        let record_batch_slice = record_batch.slice(1, 2);
2621        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2622
2623        assert!(
2624            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2625        );
2626
2627        let structs = deserialized_batch
2628            .column(0)
2629            .as_any()
2630            .downcast_ref::<StructArray>()
2631            .unwrap();
2632
2633        assert!(structs.column(0).is_null(0));
2634        assert!(structs.column(0).is_valid(1));
2635        assert!(structs.column(1).is_valid(0));
2636        assert!(structs.column(1).is_null(1));
2637        assert_eq!(record_batch_slice, deserialized_batch);
2638    }
2639
2640    #[test]
2641    fn truncate_ipc_string_array_with_all_empty_string() {
2642        fn create_batch() -> RecordBatch {
2643            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2644            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2645            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2646        }
2647
2648        let record_batch = create_batch();
2649        let record_batch_slice = record_batch.slice(0, 1);
2650        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2651
2652        assert!(
2653            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2654        );
2655        assert_eq!(record_batch_slice, deserialized_batch);
2656    }
2657
2658    #[test]
2659    fn test_stream_writer_writes_array_slice() {
2660        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2661        assert_eq!(
2662            vec![Some(1), Some(2), Some(3)],
2663            array.iter().collect::<Vec<_>>()
2664        );
2665
2666        let sliced = array.slice(1, 2);
2667        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2668
2669        let batch = RecordBatch::try_new(
2670            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2671            vec![Arc::new(sliced)],
2672        )
2673        .expect("new batch");
2674
2675        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2676        writer.write(&batch).expect("write");
2677        let outbuf = writer.into_inner().expect("inner");
2678
2679        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2680        let read_batch = reader.next().unwrap().expect("read batch");
2681
2682        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2683        assert_eq!(
2684            vec![Some(2), Some(3)],
2685            read_array.iter().collect::<Vec<_>>()
2686        );
2687    }
2688
2689    #[test]
2690    fn test_large_slice_uint32() {
2691        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
2692            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
2693        )));
2694    }
2695
2696    #[test]
2697    fn test_large_slice_string() {
2698        let strings: Vec<_> = (0..8000)
2699            .map(|i| {
2700                if i % 2 == 0 {
2701                    Some(format!("value{i}"))
2702                } else {
2703                    None
2704                }
2705            })
2706            .collect();
2707
2708        ensure_roundtrip(Arc::new(StringArray::from(strings)));
2709    }
2710
2711    #[test]
2712    fn test_large_slice_string_list() {
2713        let mut ls = ListBuilder::new(StringBuilder::new());
2714
2715        let mut s = String::new();
2716        for row_number in 0..8000 {
2717            if row_number % 2 == 0 {
2718                for list_element in 0..1000 {
2719                    s.clear();
2720                    use std::fmt::Write;
2721                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2722                    ls.values().append_value(&s);
2723                }
2724                ls.append(true)
2725            } else {
2726                ls.append(false); // null
2727            }
2728        }
2729
2730        ensure_roundtrip(Arc::new(ls.finish()));
2731    }
2732
2733    #[test]
2734    fn test_large_slice_string_list_of_lists() {
2735        // The reason for the special test is to verify reencode_offsets which looks both at
2736        // the starting offset and the data offset.  So need a dataset where the starting_offset
2737        // is zero but the data offset is not.
2738        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2739
2740        for _ in 0..4000 {
2741            ls.values().append(true);
2742            ls.append(true)
2743        }
2744
2745        let mut s = String::new();
2746        for row_number in 0..4000 {
2747            if row_number % 2 == 0 {
2748                for list_element in 0..1000 {
2749                    s.clear();
2750                    use std::fmt::Write;
2751                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2752                    ls.values().values().append_value(&s);
2753                }
2754                ls.values().append(true);
2755                ls.append(true)
2756            } else {
2757                ls.append(false); // null
2758            }
2759        }
2760
2761        ensure_roundtrip(Arc::new(ls.finish()));
2762    }
2763
2764    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2765    fn ensure_roundtrip(array: ArrayRef) {
2766        let num_rows = array.len();
2767        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2768        // take off the first element
2769        let sliced_batch = orig_batch.slice(1, num_rows - 1);
2770
2771        let schema = orig_batch.schema();
2772        let stream_data = {
2773            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2774            writer.write(&sliced_batch).unwrap();
2775            writer.into_inner().unwrap()
2776        };
2777        let read_batch = {
2778            let projection = None;
2779            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2780            reader
2781                .next()
2782                .expect("expect no errors reading batch")
2783                .expect("expect batch")
2784        };
2785        assert_eq!(sliced_batch, read_batch);
2786
2787        let file_data = {
2788            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2789            writer.write(&sliced_batch).unwrap();
2790            writer.into_inner().unwrap().into_inner().unwrap()
2791        };
2792        let read_batch = {
2793            let projection = None;
2794            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2795            reader
2796                .next()
2797                .expect("expect no errors reading batch")
2798                .expect("expect batch")
2799        };
2800        assert_eq!(sliced_batch, read_batch);
2801
2802        // TODO test file writer/reader
2803    }
2804
2805    #[test]
2806    fn encode_bools_slice() {
2807        // Test case for https://github.com/apache/arrow-rs/issues/3496
2808        assert_bool_roundtrip([true, false], 1, 1);
2809
2810        // slice somewhere in the middle
2811        assert_bool_roundtrip(
2812            [
2813                true, false, true, true, false, false, true, true, true, false, false, false, true,
2814                true, true, true, false, false, false, false, true, true, true, true, true, false,
2815                false, false, false, false,
2816            ],
2817            13,
2818            17,
2819        );
2820
2821        // start at byte boundary, end in the middle
2822        assert_bool_roundtrip(
2823            [
2824                true, false, true, true, false, false, true, true, true, false, false, false,
2825            ],
2826            8,
2827            2,
2828        );
2829
2830        // start and stop and byte boundary
2831        assert_bool_roundtrip(
2832            [
2833                true, false, true, true, false, false, true, true, true, false, false, false, true,
2834                true, true, true, true, false, false, false, false, false,
2835            ],
2836            8,
2837            8,
2838        );
2839    }
2840
2841    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2842        let val_bool_field = Field::new("val", DataType::Boolean, false);
2843
2844        let schema = Arc::new(Schema::new(vec![val_bool_field]));
2845
2846        let bools = BooleanArray::from(bools.to_vec());
2847
2848        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2849        let batch = batch.slice(offset, length);
2850
2851        let data = serialize_stream(&batch);
2852        let batch2 = deserialize_stream(data);
2853        assert_eq!(batch, batch2);
2854    }
2855
2856    #[test]
2857    fn test_run_array_unslice() {
2858        let total_len = 80;
2859        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2860        let repeats: Vec<usize> = vec![3, 4, 1, 2];
2861        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2862        for ix in 0_usize..32 {
2863            let repeat: usize = repeats[ix % repeats.len()];
2864            let val: Option<i32> = vals[ix % vals.len()];
2865            input_array.resize(input_array.len() + repeat, val);
2866        }
2867
2868        // Encode the input_array to run array
2869        let mut builder =
2870            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2871        builder.extend(input_array.iter().copied());
2872        let run_array = builder.finish();
2873
2874        // test for all slice lengths.
2875        for slice_len in 1..=total_len {
2876            // test for offset = 0, slice length = slice_len
2877            let sliced_run_array: RunArray<Int16Type> =
2878                run_array.slice(0, slice_len).into_data().into();
2879
2880            // Create unsliced run array.
2881            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2882            let typed = unsliced_run_array
2883                .downcast::<PrimitiveArray<Int32Type>>()
2884                .unwrap();
2885            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2886            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2887            assert_eq!(expected, actual);
2888
2889            // test for offset = total_len - slice_len, length = slice_len
2890            let sliced_run_array: RunArray<Int16Type> = run_array
2891                .slice(total_len - slice_len, slice_len)
2892                .into_data()
2893                .into();
2894
2895            // Create unsliced run array.
2896            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2897            let typed = unsliced_run_array
2898                .downcast::<PrimitiveArray<Int32Type>>()
2899                .unwrap();
2900            let expected: Vec<Option<i32>> = input_array
2901                .iter()
2902                .skip(total_len - slice_len)
2903                .copied()
2904                .collect();
2905            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2906            assert_eq!(expected, actual);
2907        }
2908    }
2909
2910    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2911        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2912
2913        for i in 0..100_000 {
2914            for value in [i, i, i] {
2915                ls.values().append_value(value);
2916            }
2917            ls.append(true)
2918        }
2919
2920        ls.finish()
2921    }
2922
2923    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2924        let mut ls =
2925            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2926
2927        for _i in 0..10_000 {
2928            for j in 0..10 {
2929                for value in [j, j, j, j] {
2930                    ls.values().values().append_value(value);
2931                }
2932                ls.values().append(true)
2933            }
2934            ls.append(true);
2935        }
2936
2937        ls.finish()
2938    }
2939
2940    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2941        let mut ls =
2942            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2943
2944        for _i in 0..999 {
2945            ls.values().append(true);
2946            ls.append(true);
2947        }
2948
2949        for j in 0..10 {
2950            for value in [j, j, j, j] {
2951                ls.values().values().append_value(value);
2952            }
2953            ls.values().append(true)
2954        }
2955        ls.append(true);
2956
2957        for i in 0..9_000 {
2958            for j in 0..10 {
2959                for value in [i + j, i + j, i + j, i + j] {
2960                    ls.values().values().append_value(value);
2961                }
2962                ls.values().append(true)
2963            }
2964            ls.append(true);
2965        }
2966
2967        ls.finish()
2968    }
2969
2970    fn generate_map_array_data() -> MapArray {
2971        let keys_builder = UInt32Builder::new();
2972        let values_builder = UInt32Builder::new();
2973
2974        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2975
2976        for i in 0..100_000 {
2977            for _j in 0..3 {
2978                builder.keys().append_value(i);
2979                builder.values().append_value(i * 2);
2980            }
2981            builder.append(true).unwrap();
2982        }
2983
2984        builder.finish()
2985    }
2986
2987    #[test]
2988    fn reencode_offsets_when_first_offset_is_not_zero() {
2989        let original_list = generate_list_data::<i32>();
2990        let original_data = original_list.into_data();
2991        let slice_data = original_data.slice(75, 7);
2992        let (new_offsets, original_start, length) =
2993            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2994        assert_eq!(
2995            vec![0, 3, 6, 9, 12, 15, 18, 21],
2996            new_offsets.typed_data::<i32>()
2997        );
2998        assert_eq!(225, original_start);
2999        assert_eq!(21, length);
3000    }
3001
3002    #[test]
3003    fn reencode_offsets_when_first_offset_is_zero() {
3004        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3005        // ls = [[], [35, 42]
3006        ls.append(true);
3007        ls.values().append_value(35);
3008        ls.values().append_value(42);
3009        ls.append(true);
3010        let original_list = ls.finish();
3011        let original_data = original_list.into_data();
3012
3013        let slice_data = original_data.slice(1, 1);
3014        let (new_offsets, original_start, length) =
3015            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3016        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3017        assert_eq!(0, original_start);
3018        assert_eq!(2, length);
3019    }
3020
3021    /// Ensure when serde full & sliced versions they are equal to original input.
3022    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3023    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3024        // test both full and sliced versions
3025        let in_sliced = in_batch.slice(999, 1);
3026
3027        let bytes_batch = serialize_file(&in_batch);
3028        let bytes_sliced = serialize_file(&in_sliced);
3029
3030        // serializing 1 row should be significantly smaller than serializing 100,000
3031        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3032
3033        // ensure both are still valid and equal to originals
3034        let out_batch = deserialize_file(bytes_batch);
3035        assert_eq!(in_batch, out_batch);
3036
3037        let out_sliced = deserialize_file(bytes_sliced);
3038        assert_eq!(in_sliced, out_sliced);
3039    }
3040
3041    #[test]
3042    fn encode_lists() {
3043        let val_inner = Field::new_list_field(DataType::UInt32, true);
3044        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3045        let schema = Arc::new(Schema::new(vec![val_list_field]));
3046
3047        let values = Arc::new(generate_list_data::<i32>());
3048
3049        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3050        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3051    }
3052
3053    #[test]
3054    fn encode_empty_list() {
3055        let val_inner = Field::new_list_field(DataType::UInt32, true);
3056        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3057        let schema = Arc::new(Schema::new(vec![val_list_field]));
3058
3059        let values = Arc::new(generate_list_data::<i32>());
3060
3061        let in_batch = RecordBatch::try_new(schema, vec![values])
3062            .unwrap()
3063            .slice(999, 0);
3064        let out_batch = deserialize_file(serialize_file(&in_batch));
3065        assert_eq!(in_batch, out_batch);
3066    }
3067
3068    #[test]
3069    fn encode_large_lists() {
3070        let val_inner = Field::new_list_field(DataType::UInt32, true);
3071        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3072        let schema = Arc::new(Schema::new(vec![val_list_field]));
3073
3074        let values = Arc::new(generate_list_data::<i64>());
3075
3076        // ensure when serde full & sliced versions they are equal to original input
3077        // also ensure serialized sliced version is significantly smaller than serialized full
3078        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3079        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3080    }
3081
3082    #[test]
3083    fn encode_nested_lists() {
3084        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3085        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3086        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3087        let schema = Arc::new(Schema::new(vec![list_field]));
3088
3089        let values = Arc::new(generate_nested_list_data::<i32>());
3090
3091        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3092        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3093    }
3094
3095    #[test]
3096    fn encode_nested_lists_starting_at_zero() {
3097        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3098        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3099        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3100        let schema = Arc::new(Schema::new(vec![list_field]));
3101
3102        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3103
3104        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3105        roundtrip_ensure_sliced_smaller(in_batch, 1);
3106    }
3107
3108    #[test]
3109    fn encode_map_array() {
3110        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3111        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3112        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3113        let schema = Arc::new(Schema::new(vec![map_field]));
3114
3115        let values = Arc::new(generate_map_array_data());
3116
3117        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3118        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3119    }
3120
3121    #[test]
3122    fn test_decimal128_alignment16_is_sufficient() {
3123        const IPC_ALIGNMENT: usize = 16;
3124
3125        // Test a bunch of different dimensions to ensure alignment is never an issue.
3126        // For example, if we only test `num_cols = 1` then even with alignment 8 this
3127        // test would _happen_ to pass, even though for different dimensions like
3128        // `num_cols = 2` it would fail.
3129        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3130            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
3131
3132            let mut fields = Vec::new();
3133            let mut arrays = Vec::new();
3134            for i in 0..num_cols {
3135                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3136                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3137                fields.push(field);
3138                arrays.push(Arc::new(array) as Arc<dyn Array>);
3139            }
3140            let schema = Schema::new(fields);
3141            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3142
3143            let mut writer = FileWriter::try_new_with_options(
3144                Vec::new(),
3145                batch.schema_ref(),
3146                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3147            )
3148            .unwrap();
3149            writer.write(&batch).unwrap();
3150            writer.finish().unwrap();
3151
3152            let out: Vec<u8> = writer.into_inner().unwrap();
3153
3154            let buffer = Buffer::from_vec(out);
3155            let trailer_start = buffer.len() - 10;
3156            let footer_len =
3157                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3158            let footer =
3159                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3160
3161            let schema = fb_to_schema(footer.schema().unwrap());
3162
3163            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
3164            // for `read_record_batch` later on to read the data in a zero-copy manner.
3165            let decoder =
3166                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3167
3168            let batches = footer.recordBatches().unwrap();
3169
3170            let block = batches.get(0);
3171            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3172            let data = buffer.slice_with_length(block.offset() as _, block_len);
3173
3174            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3175
3176            assert_eq!(batch, batch2);
3177        }
3178    }
3179
3180    #[test]
3181    fn test_decimal128_alignment8_is_unaligned() {
3182        const IPC_ALIGNMENT: usize = 8;
3183
3184        let num_cols = 2;
3185        let num_rows = 1;
3186
3187        let mut fields = Vec::new();
3188        let mut arrays = Vec::new();
3189        for i in 0..num_cols {
3190            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3191            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3192            fields.push(field);
3193            arrays.push(Arc::new(array) as Arc<dyn Array>);
3194        }
3195        let schema = Schema::new(fields);
3196        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3197
3198        let mut writer = FileWriter::try_new_with_options(
3199            Vec::new(),
3200            batch.schema_ref(),
3201            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3202        )
3203        .unwrap();
3204        writer.write(&batch).unwrap();
3205        writer.finish().unwrap();
3206
3207        let out: Vec<u8> = writer.into_inner().unwrap();
3208
3209        let buffer = Buffer::from_vec(out);
3210        let trailer_start = buffer.len() - 10;
3211        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3212        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3213        let schema = fb_to_schema(footer.schema().unwrap());
3214
3215        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
3216        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
3217        let decoder =
3218            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3219
3220        let batches = footer.recordBatches().unwrap();
3221
3222        let block = batches.get(0);
3223        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3224        let data = buffer.slice_with_length(block.offset() as _, block_len);
3225
3226        let result = decoder.read_record_batch(block, &data);
3227
3228        let error = result.unwrap_err();
3229        assert_eq!(
3230            error.to_string(),
3231            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3232             offset from expected alignment of 16 by 8"
3233        );
3234    }
3235
3236    #[test]
3237    fn test_flush() {
3238        // We write a schema which is small enough to fit into a buffer and not get flushed,
3239        // and then force the write with .flush().
3240        let num_cols = 2;
3241        let mut fields = Vec::new();
3242        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3243        for i in 0..num_cols {
3244            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3245            fields.push(field);
3246        }
3247        let schema = Schema::new(fields);
3248        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3249        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3250        let mut stream_writer =
3251            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3252                .unwrap();
3253        let mut file_writer =
3254            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3255
3256        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3257        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3258        stream_writer.flush().unwrap();
3259        file_writer.flush().unwrap();
3260        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3261        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3262        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3263        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
3264        // and then a length of 0 (4 bytes) for a total of 8 bytes.
3265        // Everything before that should have been flushed in the .flush() call.
3266        let expected_stream_flushed_bytes = stream_out.len() - 8;
3267        // A file write is the same as the stream write except for the leading magic string
3268        // ARROW1 plus padding, which is 8 bytes.
3269        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3270
3271        assert!(
3272            stream_bytes_written_on_new < stream_bytes_written_on_flush,
3273            "this test makes no sense if flush is not actually required"
3274        );
3275        assert!(
3276            file_bytes_written_on_new < file_bytes_written_on_flush,
3277            "this test makes no sense if flush is not actually required"
3278        );
3279        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3280        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3281    }
3282
3283    #[test]
3284    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3285        let l1_type =
3286            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3287        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3288
3289        let l0_builder = Float32Builder::new();
3290        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3291            "item",
3292            DataType::Float32,
3293            false,
3294        )));
3295        let mut l2_builder =
3296            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3297
3298        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3299            l2_builder.values().values().append_value(point[0]);
3300            l2_builder.values().values().append_value(point[1]);
3301            l2_builder.values().values().append_value(point[2]);
3302
3303            l2_builder.values().append(true);
3304        }
3305        l2_builder.append(true);
3306
3307        let point = [10., 11., 12.];
3308        l2_builder.values().values().append_value(point[0]);
3309        l2_builder.values().values().append_value(point[1]);
3310        l2_builder.values().values().append_value(point[2]);
3311
3312        l2_builder.values().append(true);
3313        l2_builder.append(true);
3314
3315        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3316
3317        let schema = Arc::new(Schema::new_with_metadata(
3318            vec![Field::new("points", l2_type, false)],
3319            HashMap::default(),
3320        ));
3321
3322        // Test a variety of combinations that include 0 and non-zero offsets
3323        // and also portions or the rest of the array
3324        test_slices(&array, &schema, 0, 1)?;
3325        test_slices(&array, &schema, 0, 2)?;
3326        test_slices(&array, &schema, 1, 1)?;
3327
3328        Ok(())
3329    }
3330
3331    #[test]
3332    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3333        let l0_builder = Float32Builder::new();
3334        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3335        let mut l2_builder = ListBuilder::new(l1_builder);
3336
3337        for point in [
3338            [Some(1.0), Some(2.0), None],
3339            [Some(4.0), Some(5.0), Some(6.0)],
3340            [None, Some(8.0), Some(9.0)],
3341        ] {
3342            for p in point {
3343                match p {
3344                    Some(p) => l2_builder.values().values().append_value(p),
3345                    None => l2_builder.values().values().append_null(),
3346                }
3347            }
3348
3349            l2_builder.values().append(true);
3350        }
3351        l2_builder.append(true);
3352
3353        let point = [Some(10.), None, None];
3354        for p in point {
3355            match p {
3356                Some(p) => l2_builder.values().values().append_value(p),
3357                None => l2_builder.values().values().append_null(),
3358            }
3359        }
3360
3361        l2_builder.values().append(true);
3362        l2_builder.append(true);
3363
3364        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3365
3366        let schema = Arc::new(Schema::new_with_metadata(
3367            vec![Field::new(
3368                "points",
3369                DataType::List(Arc::new(Field::new(
3370                    "item",
3371                    DataType::FixedSizeList(
3372                        Arc::new(Field::new("item", DataType::Float32, true)),
3373                        3,
3374                    ),
3375                    true,
3376                ))),
3377                true,
3378            )],
3379            HashMap::default(),
3380        ));
3381
3382        // Test a variety of combinations that include 0 and non-zero offsets
3383        // and also portions or the rest of the array
3384        test_slices(&array, &schema, 0, 1)?;
3385        test_slices(&array, &schema, 0, 2)?;
3386        test_slices(&array, &schema, 1, 1)?;
3387
3388        Ok(())
3389    }
3390
3391    fn test_slices(
3392        parent_array: &ArrayRef,
3393        schema: &SchemaRef,
3394        offset: usize,
3395        length: usize,
3396    ) -> Result<(), ArrowError> {
3397        let subarray = parent_array.slice(offset, length);
3398        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3399
3400        let mut bytes = Vec::new();
3401        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3402        writer.write(&original_batch)?;
3403        writer.finish()?;
3404
3405        let mut cursor = std::io::Cursor::new(bytes);
3406        let mut reader = StreamReader::try_new(&mut cursor, None)?;
3407        let returned_batch = reader.next().unwrap()?;
3408
3409        assert_eq!(original_batch, returned_batch);
3410
3411        Ok(())
3412    }
3413
3414    #[test]
3415    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3416        let int_builder = Int64Builder::new();
3417        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3418            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3419
3420        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3421            fixed_list_builder.values().append_value(point[0]);
3422            fixed_list_builder.values().append_value(point[1]);
3423            fixed_list_builder.values().append_value(point[2]);
3424
3425            fixed_list_builder.append(true);
3426        }
3427
3428        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3429
3430        let schema = Arc::new(Schema::new_with_metadata(
3431            vec![Field::new(
3432                "points",
3433                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3434                false,
3435            )],
3436            HashMap::default(),
3437        ));
3438
3439        // Test a variety of combinations that include 0 and non-zero offsets
3440        // and also portions or the rest of the array
3441        test_slices(&array, &schema, 0, 4)?;
3442        test_slices(&array, &schema, 0, 2)?;
3443        test_slices(&array, &schema, 1, 3)?;
3444        test_slices(&array, &schema, 2, 1)?;
3445
3446        Ok(())
3447    }
3448
3449    #[test]
3450    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3451        let int_builder = Int64Builder::new();
3452        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3453
3454        for point in [
3455            [Some(1), Some(2), None],
3456            [Some(4), Some(5), Some(6)],
3457            [None, Some(8), Some(9)],
3458            [Some(10), None, None],
3459        ] {
3460            for p in point {
3461                match p {
3462                    Some(p) => fixed_list_builder.values().append_value(p),
3463                    None => fixed_list_builder.values().append_null(),
3464                }
3465            }
3466
3467            fixed_list_builder.append(true);
3468        }
3469
3470        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3471
3472        let schema = Arc::new(Schema::new_with_metadata(
3473            vec![Field::new(
3474                "points",
3475                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3476                true,
3477            )],
3478            HashMap::default(),
3479        ));
3480
3481        // Test a variety of combinations that include 0 and non-zero offsets
3482        // and also portions or the rest of the array
3483        test_slices(&array, &schema, 0, 4)?;
3484        test_slices(&array, &schema, 0, 2)?;
3485        test_slices(&array, &schema, 1, 3)?;
3486        test_slices(&array, &schema, 2, 1)?;
3487
3488        Ok(())
3489    }
3490
3491    #[test]
3492    fn test_metadata_encoding_ordering() {
3493        fn create_hash() -> u64 {
3494            let metadata: HashMap<String, String> = [
3495                ("a", "1"), //
3496                ("b", "2"), //
3497                ("c", "3"), //
3498                ("d", "4"), //
3499                ("e", "5"), //
3500            ]
3501            .into_iter()
3502            .map(|(k, v)| (k.to_owned(), v.to_owned()))
3503            .collect();
3504
3505            // Set metadata on both the schema and a field within it.
3506            let schema = Arc::new(
3507                Schema::new(vec![
3508                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
3509                ])
3510                .with_metadata(metadata)
3511                .clone(),
3512            );
3513            let batch = RecordBatch::new_empty(schema.clone());
3514
3515            let mut bytes = Vec::new();
3516            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3517            w.write(&batch).unwrap();
3518            w.finish().unwrap();
3519
3520            let mut h = std::hash::DefaultHasher::new();
3521            h.write(&bytes);
3522            h.finish()
3523        }
3524
3525        let expected = create_hash();
3526
3527        // Since there is randomness in the HashMap and we cannot specify our
3528        // own Hasher for the implementation used for metadata, run the above
3529        // code 20x and verify it does not change. This is not perfect but it
3530        // should be good enough.
3531        let all_passed = (0..20).all(|_| create_hash() == expected);
3532        assert!(all_passed);
3533    }
3534}