arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
49#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51    /// Write padding after memory buffers to this multiple of bytes.
52    /// Must be 8, 16, 32, or 64 - defaults to 64.
53    alignment: u8,
54    /// The legacy format is for releases before 0.15.0, and uses metadata V4
55    write_legacy_ipc_format: bool,
56    /// The metadata version to write. The Rust IPC writer supports V4+
57    ///
58    /// *Default versions per crate*
59    ///
60    /// When creating the default IpcWriteOptions, the following metadata versions are used:
61    ///
62    /// version 2.0.0: V4, with legacy format enabled
63    /// version 4.0.0: V5
64    metadata_version: crate::MetadataVersion,
65    /// Compression, if desired. Will result in a runtime error
66    /// if the corresponding feature is not enabled
67    batch_compression_type: Option<crate::CompressionType>,
68}
69
70impl IpcWriteOptions {
71    /// Configures compression when writing IPC files.
72    ///
73    /// Will result in a runtime error if the corresponding feature
74    /// is not enabled
75    pub fn try_with_compression(
76        mut self,
77        batch_compression_type: Option<crate::CompressionType>,
78    ) -> Result<Self, ArrowError> {
79        self.batch_compression_type = batch_compression_type;
80
81        if self.batch_compression_type.is_some()
82            && self.metadata_version < crate::MetadataVersion::V5
83        {
84            return Err(ArrowError::InvalidArgumentError(
85                "Compression only supported in metadata v5 and above".to_string(),
86            ));
87        }
88        Ok(self)
89    }
90    /// Try to create IpcWriteOptions, checking for incompatible settings
91    pub fn try_new(
92        alignment: usize,
93        write_legacy_ipc_format: bool,
94        metadata_version: crate::MetadataVersion,
95    ) -> Result<Self, ArrowError> {
96        let is_alignment_valid =
97            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
98        if !is_alignment_valid {
99            return Err(ArrowError::InvalidArgumentError(
100                "Alignment should be 8, 16, 32, or 64.".to_string(),
101            ));
102        }
103        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
104        match metadata_version {
105            crate::MetadataVersion::V1
106            | crate::MetadataVersion::V2
107            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
108                "Writing IPC metadata version 3 and lower not supported".to_string(),
109            )),
110            #[allow(deprecated)]
111            crate::MetadataVersion::V4 => Ok(Self {
112                alignment,
113                write_legacy_ipc_format,
114                metadata_version,
115                batch_compression_type: None,
116            }),
117            crate::MetadataVersion::V5 => {
118                if write_legacy_ipc_format {
119                    Err(ArrowError::InvalidArgumentError(
120                        "Legacy IPC format only supported on metadata version 4".to_string(),
121                    ))
122                } else {
123                    Ok(Self {
124                        alignment,
125                        write_legacy_ipc_format,
126                        metadata_version,
127                        batch_compression_type: None,
128                    })
129                }
130            }
131            z => Err(ArrowError::InvalidArgumentError(format!(
132                "Unsupported crate::MetadataVersion {z:?}"
133            ))),
134        }
135    }
136}
137
138impl Default for IpcWriteOptions {
139    fn default() -> Self {
140        Self {
141            alignment: 64,
142            write_legacy_ipc_format: false,
143            metadata_version: crate::MetadataVersion::V5,
144            batch_compression_type: None,
145        }
146    }
147}
148
149#[derive(Debug, Default)]
150/// Handles low level details of encoding [`Array`] and [`Schema`] into the
151/// [Arrow IPC Format].
152///
153/// # Example
154/// ```
155/// # fn run() {
156/// # use std::sync::Arc;
157/// # use arrow_array::UInt64Array;
158/// # use arrow_array::RecordBatch;
159/// # use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
160///
161/// // Create a record batch
162/// let batch = RecordBatch::try_from_iter(vec![
163///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
164/// ]).unwrap();
165///
166/// // Error of dictionary ids are replaced.
167/// let error_on_replacement = true;
168/// let options = IpcWriteOptions::default();
169/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
170///
171/// // encode the batch into zero or more encoded dictionaries
172/// // and the data for the actual array.
173/// let data_gen = IpcDataGenerator::default();
174/// let (encoded_dictionaries, encoded_message) = data_gen
175///   .encoded_batch(&batch, &mut dictionary_tracker, &options)
176///   .unwrap();
177/// # }
178/// ```
179///
180/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
181pub struct IpcDataGenerator {}
182
183impl IpcDataGenerator {
184    /// Converts a schema to an IPC message along with `dictionary_tracker`
185    /// and returns it encoded inside [EncodedData] as a flatbuffer.
186    pub fn schema_to_bytes_with_dictionary_tracker(
187        &self,
188        schema: &Schema,
189        dictionary_tracker: &mut DictionaryTracker,
190        write_options: &IpcWriteOptions,
191    ) -> EncodedData {
192        let mut fbb = FlatBufferBuilder::new();
193        let schema = {
194            let fb = IpcSchemaEncoder::new()
195                .with_dictionary_tracker(dictionary_tracker)
196                .schema_to_fb_offset(&mut fbb, schema);
197            fb.as_union_value()
198        };
199
200        let mut message = crate::MessageBuilder::new(&mut fbb);
201        message.add_version(write_options.metadata_version);
202        message.add_header_type(crate::MessageHeader::Schema);
203        message.add_bodyLength(0);
204        message.add_header(schema);
205        // TODO: custom metadata
206        let data = message.finish();
207        fbb.finish(data, None);
208
209        let data = fbb.finished_data();
210        EncodedData {
211            ipc_message: data.to_vec(),
212            arrow_data: vec![],
213        }
214    }
215
216    fn _encode_dictionaries<I: Iterator<Item = i64>>(
217        &self,
218        column: &ArrayRef,
219        encoded_dictionaries: &mut Vec<EncodedData>,
220        dictionary_tracker: &mut DictionaryTracker,
221        write_options: &IpcWriteOptions,
222        dict_id: &mut I,
223    ) -> Result<(), ArrowError> {
224        match column.data_type() {
225            DataType::Struct(fields) => {
226                let s = as_struct_array(column);
227                for (field, column) in fields.iter().zip(s.columns()) {
228                    self.encode_dictionaries(
229                        field,
230                        column,
231                        encoded_dictionaries,
232                        dictionary_tracker,
233                        write_options,
234                        dict_id,
235                    )?;
236                }
237            }
238            DataType::RunEndEncoded(_, values) => {
239                let data = column.to_data();
240                if data.child_data().len() != 2 {
241                    return Err(ArrowError::InvalidArgumentError(format!(
242                        "The run encoded array should have exactly two child arrays. Found {}",
243                        data.child_data().len()
244                    )));
245                }
246                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
247                // only for values array.
248                let values_array = make_array(data.child_data()[1].clone());
249                self.encode_dictionaries(
250                    values,
251                    &values_array,
252                    encoded_dictionaries,
253                    dictionary_tracker,
254                    write_options,
255                    dict_id,
256                )?;
257            }
258            DataType::List(field) => {
259                let list = as_list_array(column);
260                self.encode_dictionaries(
261                    field,
262                    list.values(),
263                    encoded_dictionaries,
264                    dictionary_tracker,
265                    write_options,
266                    dict_id,
267                )?;
268            }
269            DataType::LargeList(field) => {
270                let list = as_large_list_array(column);
271                self.encode_dictionaries(
272                    field,
273                    list.values(),
274                    encoded_dictionaries,
275                    dictionary_tracker,
276                    write_options,
277                    dict_id,
278                )?;
279            }
280            DataType::FixedSizeList(field, _) => {
281                let list = column
282                    .as_any()
283                    .downcast_ref::<FixedSizeListArray>()
284                    .expect("Unable to downcast to fixed size list array");
285                self.encode_dictionaries(
286                    field,
287                    list.values(),
288                    encoded_dictionaries,
289                    dictionary_tracker,
290                    write_options,
291                    dict_id,
292                )?;
293            }
294            DataType::Map(field, _) => {
295                let map_array = as_map_array(column);
296
297                let (keys, values) = match field.data_type() {
298                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
299                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
300                };
301
302                // keys
303                self.encode_dictionaries(
304                    keys,
305                    map_array.keys(),
306                    encoded_dictionaries,
307                    dictionary_tracker,
308                    write_options,
309                    dict_id,
310                )?;
311
312                // values
313                self.encode_dictionaries(
314                    values,
315                    map_array.values(),
316                    encoded_dictionaries,
317                    dictionary_tracker,
318                    write_options,
319                    dict_id,
320                )?;
321            }
322            DataType::Union(fields, _) => {
323                let union = as_union_array(column);
324                for (type_id, field) in fields.iter() {
325                    let column = union.child(type_id);
326                    self.encode_dictionaries(
327                        field,
328                        column,
329                        encoded_dictionaries,
330                        dictionary_tracker,
331                        write_options,
332                        dict_id,
333                    )?;
334                }
335            }
336            _ => (),
337        }
338
339        Ok(())
340    }
341
342    fn encode_dictionaries<I: Iterator<Item = i64>>(
343        &self,
344        field: &Field,
345        column: &ArrayRef,
346        encoded_dictionaries: &mut Vec<EncodedData>,
347        dictionary_tracker: &mut DictionaryTracker,
348        write_options: &IpcWriteOptions,
349        dict_id_seq: &mut I,
350    ) -> Result<(), ArrowError> {
351        match column.data_type() {
352            DataType::Dictionary(_key_type, _value_type) => {
353                let dict_data = column.to_data();
354                let dict_values = &dict_data.child_data()[0];
355
356                let values = make_array(dict_data.child_data()[0].clone());
357
358                self._encode_dictionaries(
359                    &values,
360                    encoded_dictionaries,
361                    dictionary_tracker,
362                    write_options,
363                    dict_id_seq,
364                )?;
365
366                // It's importnat to only take the dict_id at this point, because the dict ID
367                // sequence is assigned depth-first, so we need to first encode children and have
368                // them take their assigned dict IDs before we take the dict ID for this field.
369                let dict_id = dict_id_seq.next().ok_or_else(|| {
370                    ArrowError::IpcError(format!("no dict id for field {}", field.name()))
371                })?;
372
373                let emit = dictionary_tracker.insert(dict_id, column)?;
374
375                if emit {
376                    encoded_dictionaries.push(self.dictionary_batch_to_bytes(
377                        dict_id,
378                        dict_values,
379                        write_options,
380                    )?);
381                }
382            }
383            _ => self._encode_dictionaries(
384                column,
385                encoded_dictionaries,
386                dictionary_tracker,
387                write_options,
388                dict_id_seq,
389            )?,
390        }
391
392        Ok(())
393    }
394
395    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
396    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
397    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
398    pub fn encoded_batch(
399        &self,
400        batch: &RecordBatch,
401        dictionary_tracker: &mut DictionaryTracker,
402        write_options: &IpcWriteOptions,
403    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
404        let schema = batch.schema();
405        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
406
407        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
408
409        for (i, field) in schema.fields().iter().enumerate() {
410            let column = batch.column(i);
411            self.encode_dictionaries(
412                field,
413                column,
414                &mut encoded_dictionaries,
415                dictionary_tracker,
416                write_options,
417                &mut dict_id,
418            )?;
419        }
420
421        let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
422        Ok((encoded_dictionaries, encoded_message))
423    }
424
425    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
426    /// other for the batch's data
427    fn record_batch_to_bytes(
428        &self,
429        batch: &RecordBatch,
430        write_options: &IpcWriteOptions,
431    ) -> Result<EncodedData, ArrowError> {
432        let mut fbb = FlatBufferBuilder::new();
433
434        let mut nodes: Vec<crate::FieldNode> = vec![];
435        let mut buffers: Vec<crate::Buffer> = vec![];
436        let mut arrow_data: Vec<u8> = vec![];
437        let mut offset = 0;
438
439        // get the type of compression
440        let batch_compression_type = write_options.batch_compression_type;
441
442        let compression = batch_compression_type.map(|batch_compression_type| {
443            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
444            c.add_method(crate::BodyCompressionMethod::BUFFER);
445            c.add_codec(batch_compression_type);
446            c.finish()
447        });
448
449        let compression_codec: Option<CompressionCodec> =
450            batch_compression_type.map(TryInto::try_into).transpose()?;
451
452        let mut variadic_buffer_counts = vec![];
453
454        for array in batch.columns() {
455            let array_data = array.to_data();
456            offset = write_array_data(
457                &array_data,
458                &mut buffers,
459                &mut arrow_data,
460                &mut nodes,
461                offset,
462                array.len(),
463                array.null_count(),
464                compression_codec,
465                write_options,
466            )?;
467
468            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
469        }
470        // pad the tail of body data
471        let len = arrow_data.len();
472        let pad_len = pad_to_alignment(write_options.alignment, len);
473        arrow_data.extend_from_slice(&PADDING[..pad_len]);
474
475        // write data
476        let buffers = fbb.create_vector(&buffers);
477        let nodes = fbb.create_vector(&nodes);
478        let variadic_buffer = if variadic_buffer_counts.is_empty() {
479            None
480        } else {
481            Some(fbb.create_vector(&variadic_buffer_counts))
482        };
483
484        let root = {
485            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
486            batch_builder.add_length(batch.num_rows() as i64);
487            batch_builder.add_nodes(nodes);
488            batch_builder.add_buffers(buffers);
489            if let Some(c) = compression {
490                batch_builder.add_compression(c);
491            }
492
493            if let Some(v) = variadic_buffer {
494                batch_builder.add_variadicBufferCounts(v);
495            }
496            let b = batch_builder.finish();
497            b.as_union_value()
498        };
499        // create an crate::Message
500        let mut message = crate::MessageBuilder::new(&mut fbb);
501        message.add_version(write_options.metadata_version);
502        message.add_header_type(crate::MessageHeader::RecordBatch);
503        message.add_bodyLength(arrow_data.len() as i64);
504        message.add_header(root);
505        let root = message.finish();
506        fbb.finish(root, None);
507        let finished_data = fbb.finished_data();
508
509        Ok(EncodedData {
510            ipc_message: finished_data.to_vec(),
511            arrow_data,
512        })
513    }
514
515    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
516    /// other for the data
517    fn dictionary_batch_to_bytes(
518        &self,
519        dict_id: i64,
520        array_data: &ArrayData,
521        write_options: &IpcWriteOptions,
522    ) -> Result<EncodedData, ArrowError> {
523        let mut fbb = FlatBufferBuilder::new();
524
525        let mut nodes: Vec<crate::FieldNode> = vec![];
526        let mut buffers: Vec<crate::Buffer> = vec![];
527        let mut arrow_data: Vec<u8> = vec![];
528
529        // get the type of compression
530        let batch_compression_type = write_options.batch_compression_type;
531
532        let compression = batch_compression_type.map(|batch_compression_type| {
533            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
534            c.add_method(crate::BodyCompressionMethod::BUFFER);
535            c.add_codec(batch_compression_type);
536            c.finish()
537        });
538
539        let compression_codec: Option<CompressionCodec> = batch_compression_type
540            .map(|batch_compression_type| batch_compression_type.try_into())
541            .transpose()?;
542
543        write_array_data(
544            array_data,
545            &mut buffers,
546            &mut arrow_data,
547            &mut nodes,
548            0,
549            array_data.len(),
550            array_data.null_count(),
551            compression_codec,
552            write_options,
553        )?;
554
555        let mut variadic_buffer_counts = vec![];
556        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
557
558        // pad the tail of body data
559        let len = arrow_data.len();
560        let pad_len = pad_to_alignment(write_options.alignment, len);
561        arrow_data.extend_from_slice(&PADDING[..pad_len]);
562
563        // write data
564        let buffers = fbb.create_vector(&buffers);
565        let nodes = fbb.create_vector(&nodes);
566        let variadic_buffer = if variadic_buffer_counts.is_empty() {
567            None
568        } else {
569            Some(fbb.create_vector(&variadic_buffer_counts))
570        };
571
572        let root = {
573            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
574            batch_builder.add_length(array_data.len() as i64);
575            batch_builder.add_nodes(nodes);
576            batch_builder.add_buffers(buffers);
577            if let Some(c) = compression {
578                batch_builder.add_compression(c);
579            }
580            if let Some(v) = variadic_buffer {
581                batch_builder.add_variadicBufferCounts(v);
582            }
583            batch_builder.finish()
584        };
585
586        let root = {
587            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
588            batch_builder.add_id(dict_id);
589            batch_builder.add_data(root);
590            batch_builder.finish().as_union_value()
591        };
592
593        let root = {
594            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
595            message_builder.add_version(write_options.metadata_version);
596            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
597            message_builder.add_bodyLength(arrow_data.len() as i64);
598            message_builder.add_header(root);
599            message_builder.finish()
600        };
601
602        fbb.finish(root, None);
603        let finished_data = fbb.finished_data();
604
605        Ok(EncodedData {
606            ipc_message: finished_data.to_vec(),
607            arrow_data,
608        })
609    }
610}
611
612fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
613    match array.data_type() {
614        DataType::BinaryView | DataType::Utf8View => {
615            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
616            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
617            counts.push(array.buffers().len() as i64 - 1);
618        }
619        DataType::Dictionary(_, _) => {
620            // Do nothing
621            // Dictionary types are handled in `encode_dictionaries`.
622        }
623        _ => {
624            for child in array.child_data() {
625                append_variadic_buffer_counts(counts, child)
626            }
627        }
628    }
629}
630
631pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
632    match arr.data_type() {
633        DataType::RunEndEncoded(k, _) => match k.data_type() {
634            DataType::Int16 => {
635                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
636            }
637            DataType::Int32 => {
638                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
639            }
640            DataType::Int64 => {
641                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
642            }
643            d => unreachable!("Unexpected data type {d}"),
644        },
645        d => Err(ArrowError::InvalidArgumentError(format!(
646            "The given array is not a run array. Data type of given array: {d}"
647        ))),
648    }
649}
650
651// Returns a `RunArray` with zero offset and length matching the last value
652// in run_ends array.
653fn into_zero_offset_run_array<R: RunEndIndexType>(
654    run_array: RunArray<R>,
655) -> Result<RunArray<R>, ArrowError> {
656    let run_ends = run_array.run_ends();
657    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
658        return Ok(run_array);
659    }
660
661    // The physical index of original run_ends array from which the `ArrayData`is sliced.
662    let start_physical_index = run_ends.get_start_physical_index();
663
664    // The physical index of original run_ends array until which the `ArrayData`is sliced.
665    let end_physical_index = run_ends.get_end_physical_index();
666
667    let physical_length = end_physical_index - start_physical_index + 1;
668
669    // build new run_ends array by subtracting offset from run ends.
670    let offset = R::Native::usize_as(run_ends.offset());
671    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
672    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
673        builder.append(run_end_value.sub_wrapping(offset));
674    }
675    builder.append(R::Native::from_usize(run_array.len()).unwrap());
676    let new_run_ends = unsafe {
677        // Safety:
678        // The function builds a valid run_ends array and hence need not be validated.
679        ArrayDataBuilder::new(R::DATA_TYPE)
680            .len(physical_length)
681            .add_buffer(builder.finish())
682            .build_unchecked()
683    };
684
685    // build new values by slicing physical indices.
686    let new_values = run_array
687        .values()
688        .slice(start_physical_index, physical_length)
689        .into_data();
690
691    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
692        .len(run_array.len())
693        .add_child_data(new_run_ends)
694        .add_child_data(new_values);
695    let array_data = unsafe {
696        // Safety:
697        //  This function builds a valid run array and hence can skip validation.
698        builder.build_unchecked()
699    };
700    Ok(array_data.into())
701}
702
703/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
704/// multiple times.
705///
706/// Can optionally error if an update to an existing dictionary is attempted, which
707/// isn't allowed in the `FileWriter`.
708#[derive(Debug)]
709pub struct DictionaryTracker {
710    written: HashMap<i64, ArrayData>,
711    dict_ids: Vec<i64>,
712    error_on_replacement: bool,
713}
714
715impl DictionaryTracker {
716    /// Create a new [`DictionaryTracker`].
717    ///
718    /// If `error_on_replacement`
719    /// is true, an error will be generated if an update to an
720    /// existing dictionary is attempted.
721    ///
722    /// If `preserve_dict_id` is true, the dictionary ID defined in the schema
723    /// is used, otherwise a unique dictionary ID will be assigned by incrementing
724    /// the last seen dictionary ID (or using `0` if no other dictionary IDs have been
725    /// seen)
726    pub fn new(error_on_replacement: bool) -> Self {
727        #[allow(deprecated)]
728        Self {
729            written: HashMap::new(),
730            dict_ids: Vec::new(),
731            error_on_replacement,
732        }
733    }
734
735    /// Record and return the next dictionary ID.
736    pub fn next_dict_id(&mut self) -> i64 {
737        let next = self
738            .dict_ids
739            .last()
740            .copied()
741            .map(|i| i + 1)
742            .unwrap_or_default();
743
744        self.dict_ids.push(next);
745        next
746    }
747
748    /// Return the sequence of dictionary IDs in the order they should be observed while
749    /// traversing the schema
750    pub fn dict_id(&mut self) -> &[i64] {
751        &self.dict_ids
752    }
753
754    /// Keep track of the dictionary with the given ID and values. Behavior:
755    ///
756    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
757    ///   that the dictionary was not actually inserted (because it's already been seen).
758    /// * If this ID has been written already but with different data, and this tracker is
759    ///   configured to return an error, return an error.
760    /// * If the tracker has not been configured to error on replacement or this dictionary
761    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
762    ///   inserted.
763    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
764        let dict_data = column.to_data();
765        let dict_values = &dict_data.child_data()[0];
766
767        // If a dictionary with this id was already emitted, check if it was the same.
768        if let Some(last) = self.written.get(&dict_id) {
769            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
770                // Same dictionary values => no need to emit it again
771                return Ok(false);
772            }
773            if self.error_on_replacement {
774                // If error on replacement perform a logical comparison
775                if last.child_data()[0] == *dict_values {
776                    // Same dictionary values => no need to emit it again
777                    return Ok(false);
778                }
779                return Err(ArrowError::InvalidArgumentError(
780                    "Dictionary replacement detected when writing IPC file format. \
781                     Arrow IPC files only support a single dictionary for a given field \
782                     across all batches."
783                        .to_string(),
784                ));
785            }
786        }
787
788        self.written.insert(dict_id, dict_data);
789        Ok(true)
790    }
791}
792
793/// Arrow File Writer
794///
795/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
796///
797/// # See Also
798///
799/// * [`StreamWriter`] for writing IPC Streams
800///
801/// # Example
802/// ```
803/// # use arrow_array::record_batch;
804/// # use arrow_ipc::writer::FileWriter;
805/// # let mut file = vec![]; // mimic a file for the example
806/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
807/// // create a new writer, the schema must be known in advance
808/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
809/// // write each batch to the underlying writer
810/// writer.write(&batch).unwrap();
811/// // When all batches are written, call finish to flush all buffers
812/// writer.finish().unwrap();
813/// ```
814/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
815pub struct FileWriter<W> {
816    /// The object to write to
817    writer: W,
818    /// IPC write options
819    write_options: IpcWriteOptions,
820    /// A reference to the schema, used in validating record batches
821    schema: SchemaRef,
822    /// The number of bytes between each block of bytes, as an offset for random access
823    block_offsets: usize,
824    /// Dictionary blocks that will be written as part of the IPC footer
825    dictionary_blocks: Vec<crate::Block>,
826    /// Record blocks that will be written as part of the IPC footer
827    record_blocks: Vec<crate::Block>,
828    /// Whether the writer footer has been written, and the writer is finished
829    finished: bool,
830    /// Keeps track of dictionaries that have been written
831    dictionary_tracker: DictionaryTracker,
832    /// User level customized metadata
833    custom_metadata: HashMap<String, String>,
834
835    data_gen: IpcDataGenerator,
836}
837
838impl<W: Write> FileWriter<BufWriter<W>> {
839    /// Try to create a new file writer with the writer wrapped in a BufWriter.
840    ///
841    /// See [`FileWriter::try_new`] for an unbuffered version.
842    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
843        Self::try_new(BufWriter::new(writer), schema)
844    }
845}
846
847impl<W: Write> FileWriter<W> {
848    /// Try to create a new writer, with the schema written as part of the header
849    ///
850    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
851    ///
852    /// # Errors
853    ///
854    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
855    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
856        let write_options = IpcWriteOptions::default();
857        Self::try_new_with_options(writer, schema, write_options)
858    }
859
860    /// Try to create a new writer with IpcWriteOptions
861    ///
862    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
863    ///
864    /// # Errors
865    ///
866    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
867    pub fn try_new_with_options(
868        mut writer: W,
869        schema: &Schema,
870        write_options: IpcWriteOptions,
871    ) -> Result<Self, ArrowError> {
872        let data_gen = IpcDataGenerator::default();
873        // write magic to header aligned on alignment boundary
874        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
875        let header_size = super::ARROW_MAGIC.len() + pad_len;
876        writer.write_all(&super::ARROW_MAGIC)?;
877        writer.write_all(&PADDING[..pad_len])?;
878        // write the schema, set the written bytes to the schema + header
879        let mut dictionary_tracker = DictionaryTracker::new(true);
880        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
881            schema,
882            &mut dictionary_tracker,
883            &write_options,
884        );
885        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
886        Ok(Self {
887            writer,
888            write_options,
889            schema: Arc::new(schema.clone()),
890            block_offsets: meta + data + header_size,
891            dictionary_blocks: vec![],
892            record_blocks: vec![],
893            finished: false,
894            dictionary_tracker,
895            custom_metadata: HashMap::new(),
896            data_gen,
897        })
898    }
899
900    /// Adds a key-value pair to the [FileWriter]'s custom metadata
901    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
902        self.custom_metadata.insert(key.into(), value.into());
903    }
904
905    /// Write a record batch to the file
906    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
907        if self.finished {
908            return Err(ArrowError::IpcError(
909                "Cannot write record batch to file writer as it is closed".to_string(),
910            ));
911        }
912
913        let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
914            batch,
915            &mut self.dictionary_tracker,
916            &self.write_options,
917        )?;
918
919        for encoded_dictionary in encoded_dictionaries {
920            let (meta, data) =
921                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
922
923            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
924            self.dictionary_blocks.push(block);
925            self.block_offsets += meta + data;
926        }
927
928        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
929        // add a record block for the footer
930        let block = crate::Block::new(
931            self.block_offsets as i64,
932            meta as i32, // TODO: is this still applicable?
933            data as i64,
934        );
935        self.record_blocks.push(block);
936        self.block_offsets += meta + data;
937        Ok(())
938    }
939
940    /// Write footer and closing tag, then mark the writer as done
941    pub fn finish(&mut self) -> Result<(), ArrowError> {
942        if self.finished {
943            return Err(ArrowError::IpcError(
944                "Cannot write footer to file writer as it is closed".to_string(),
945            ));
946        }
947
948        // write EOS
949        write_continuation(&mut self.writer, &self.write_options, 0)?;
950
951        let mut fbb = FlatBufferBuilder::new();
952        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
953        let record_batches = fbb.create_vector(&self.record_blocks);
954        let mut dictionary_tracker = DictionaryTracker::new(true);
955        let schema = IpcSchemaEncoder::new()
956            .with_dictionary_tracker(&mut dictionary_tracker)
957            .schema_to_fb_offset(&mut fbb, &self.schema);
958        let fb_custom_metadata = (!self.custom_metadata.is_empty())
959            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
960
961        let root = {
962            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
963            footer_builder.add_version(self.write_options.metadata_version);
964            footer_builder.add_schema(schema);
965            footer_builder.add_dictionaries(dictionaries);
966            footer_builder.add_recordBatches(record_batches);
967            if let Some(fb_custom_metadata) = fb_custom_metadata {
968                footer_builder.add_custom_metadata(fb_custom_metadata);
969            }
970            footer_builder.finish()
971        };
972        fbb.finish(root, None);
973        let footer_data = fbb.finished_data();
974        self.writer.write_all(footer_data)?;
975        self.writer
976            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
977        self.writer.write_all(&super::ARROW_MAGIC)?;
978        self.writer.flush()?;
979        self.finished = true;
980
981        Ok(())
982    }
983
984    /// Returns the arrow [`SchemaRef`] for this arrow file.
985    pub fn schema(&self) -> &SchemaRef {
986        &self.schema
987    }
988
989    /// Gets a reference to the underlying writer.
990    pub fn get_ref(&self) -> &W {
991        &self.writer
992    }
993
994    /// Gets a mutable reference to the underlying writer.
995    ///
996    /// It is inadvisable to directly write to the underlying writer.
997    pub fn get_mut(&mut self) -> &mut W {
998        &mut self.writer
999    }
1000
1001    /// Flush the underlying writer.
1002    ///
1003    /// Both the BufWriter and the underlying writer are flushed.
1004    pub fn flush(&mut self) -> Result<(), ArrowError> {
1005        self.writer.flush()?;
1006        Ok(())
1007    }
1008
1009    /// Unwraps the underlying writer.
1010    ///
1011    /// The writer is flushed and the FileWriter is finished before returning.
1012    ///
1013    /// # Errors
1014    ///
1015    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1016    /// or while flushing the writer.
1017    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1018        if !self.finished {
1019            // `finish` flushes the writer.
1020            self.finish()?;
1021        }
1022        Ok(self.writer)
1023    }
1024}
1025
1026impl<W: Write> RecordBatchWriter for FileWriter<W> {
1027    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1028        self.write(batch)
1029    }
1030
1031    fn close(mut self) -> Result<(), ArrowError> {
1032        self.finish()
1033    }
1034}
1035
1036/// Arrow Stream Writer
1037///
1038/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1039///
1040/// # See Also
1041///
1042/// * [`FileWriter`] for writing IPC Files
1043///
1044/// # Example
1045/// ```
1046/// # use arrow_array::record_batch;
1047/// # use arrow_ipc::writer::StreamWriter;
1048/// # let mut stream = vec![]; // mimic a stream for the example
1049/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1050/// // create a new writer, the schema must be known in advance
1051/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1052/// // write each batch to the underlying stream
1053/// writer.write(&batch).unwrap();
1054/// // When all batches are written, call finish to flush all buffers
1055/// writer.finish().unwrap();
1056/// ```
1057///
1058/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1059pub struct StreamWriter<W> {
1060    /// The object to write to
1061    writer: W,
1062    /// IPC write options
1063    write_options: IpcWriteOptions,
1064    /// Whether the writer footer has been written, and the writer is finished
1065    finished: bool,
1066    /// Keeps track of dictionaries that have been written
1067    dictionary_tracker: DictionaryTracker,
1068
1069    data_gen: IpcDataGenerator,
1070}
1071
1072impl<W: Write> StreamWriter<BufWriter<W>> {
1073    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1074    ///
1075    /// See [`StreamWriter::try_new`] for an unbuffered version.
1076    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1077        Self::try_new(BufWriter::new(writer), schema)
1078    }
1079}
1080
1081impl<W: Write> StreamWriter<W> {
1082    /// Try to create a new writer, with the schema written as part of the header.
1083    ///
1084    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1085    ///
1086    /// # Errors
1087    ///
1088    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1089    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1090        let write_options = IpcWriteOptions::default();
1091        Self::try_new_with_options(writer, schema, write_options)
1092    }
1093
1094    /// Try to create a new writer with [`IpcWriteOptions`].
1095    ///
1096    /// # Errors
1097    ///
1098    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1099    pub fn try_new_with_options(
1100        mut writer: W,
1101        schema: &Schema,
1102        write_options: IpcWriteOptions,
1103    ) -> Result<Self, ArrowError> {
1104        let data_gen = IpcDataGenerator::default();
1105        let mut dictionary_tracker = DictionaryTracker::new(false);
1106
1107        // write the schema, set the written bytes to the schema
1108        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1109            schema,
1110            &mut dictionary_tracker,
1111            &write_options,
1112        );
1113        write_message(&mut writer, encoded_message, &write_options)?;
1114        Ok(Self {
1115            writer,
1116            write_options,
1117            finished: false,
1118            dictionary_tracker,
1119            data_gen,
1120        })
1121    }
1122
1123    /// Write a record batch to the stream
1124    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1125        if self.finished {
1126            return Err(ArrowError::IpcError(
1127                "Cannot write record batch to stream writer as it is closed".to_string(),
1128            ));
1129        }
1130
1131        let (encoded_dictionaries, encoded_message) = self
1132            .data_gen
1133            .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1134            .expect("StreamWriter is configured to not error on dictionary replacement");
1135
1136        for encoded_dictionary in encoded_dictionaries {
1137            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1138        }
1139
1140        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1141        Ok(())
1142    }
1143
1144    /// Write continuation bytes, and mark the stream as done
1145    pub fn finish(&mut self) -> Result<(), ArrowError> {
1146        if self.finished {
1147            return Err(ArrowError::IpcError(
1148                "Cannot write footer to stream writer as it is closed".to_string(),
1149            ));
1150        }
1151
1152        write_continuation(&mut self.writer, &self.write_options, 0)?;
1153
1154        self.finished = true;
1155
1156        Ok(())
1157    }
1158
1159    /// Gets a reference to the underlying writer.
1160    pub fn get_ref(&self) -> &W {
1161        &self.writer
1162    }
1163
1164    /// Gets a mutable reference to the underlying writer.
1165    ///
1166    /// It is inadvisable to directly write to the underlying writer.
1167    pub fn get_mut(&mut self) -> &mut W {
1168        &mut self.writer
1169    }
1170
1171    /// Flush the underlying writer.
1172    ///
1173    /// Both the BufWriter and the underlying writer are flushed.
1174    pub fn flush(&mut self) -> Result<(), ArrowError> {
1175        self.writer.flush()?;
1176        Ok(())
1177    }
1178
1179    /// Unwraps the the underlying writer.
1180    ///
1181    /// The writer is flushed and the StreamWriter is finished before returning.
1182    ///
1183    /// # Errors
1184    ///
1185    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1186    /// or while flushing the writer.
1187    ///
1188    /// # Example
1189    ///
1190    /// ```
1191    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1192    /// # use arrow_ipc::MetadataVersion;
1193    /// # use arrow_schema::{ArrowError, Schema};
1194    /// # fn main() -> Result<(), ArrowError> {
1195    /// // The result we expect from an empty schema
1196    /// let expected = vec![
1197    ///     255, 255, 255, 255,  48,   0,   0,   0,
1198    ///      16,   0,   0,   0,   0,   0,  10,   0,
1199    ///      12,   0,  10,   0,   9,   0,   4,   0,
1200    ///      10,   0,   0,   0,  16,   0,   0,   0,
1201    ///       0,   1,   4,   0,   8,   0,   8,   0,
1202    ///       0,   0,   4,   0,   8,   0,   0,   0,
1203    ///       4,   0,   0,   0,   0,   0,   0,   0,
1204    ///     255, 255, 255, 255,   0,   0,   0,   0
1205    /// ];
1206    ///
1207    /// let schema = Schema::empty();
1208    /// let buffer: Vec<u8> = Vec::new();
1209    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1210    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1211    ///
1212    /// assert_eq!(stream_writer.into_inner()?, expected);
1213    /// # Ok(())
1214    /// # }
1215    /// ```
1216    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1217        if !self.finished {
1218            // `finish` flushes.
1219            self.finish()?;
1220        }
1221        Ok(self.writer)
1222    }
1223}
1224
1225impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1226    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1227        self.write(batch)
1228    }
1229
1230    fn close(mut self) -> Result<(), ArrowError> {
1231        self.finish()
1232    }
1233}
1234
1235/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1236pub struct EncodedData {
1237    /// An encoded crate::Message
1238    pub ipc_message: Vec<u8>,
1239    /// Arrow buffers to be written, should be an empty vec for schema messages
1240    pub arrow_data: Vec<u8>,
1241}
1242/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1243pub fn write_message<W: Write>(
1244    mut writer: W,
1245    encoded: EncodedData,
1246    write_options: &IpcWriteOptions,
1247) -> Result<(usize, usize), ArrowError> {
1248    let arrow_data_len = encoded.arrow_data.len();
1249    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1250        return Err(ArrowError::MemoryError(
1251            "Arrow data not aligned".to_string(),
1252        ));
1253    }
1254
1255    let a = usize::from(write_options.alignment - 1);
1256    let buffer = encoded.ipc_message;
1257    let flatbuf_size = buffer.len();
1258    let prefix_size = if write_options.write_legacy_ipc_format {
1259        4
1260    } else {
1261        8
1262    };
1263    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1264    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1265
1266    write_continuation(
1267        &mut writer,
1268        write_options,
1269        (aligned_size - prefix_size) as i32,
1270    )?;
1271
1272    // write the flatbuf
1273    if flatbuf_size > 0 {
1274        writer.write_all(&buffer)?;
1275    }
1276    // write padding
1277    writer.write_all(&PADDING[..padding_bytes])?;
1278
1279    // write arrow data
1280    let body_len = if arrow_data_len > 0 {
1281        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1282    } else {
1283        0
1284    };
1285
1286    Ok((aligned_size, body_len))
1287}
1288
1289fn write_body_buffers<W: Write>(
1290    mut writer: W,
1291    data: &[u8],
1292    alignment: u8,
1293) -> Result<usize, ArrowError> {
1294    let len = data.len();
1295    let pad_len = pad_to_alignment(alignment, len);
1296    let total_len = len + pad_len;
1297
1298    // write body buffer
1299    writer.write_all(data)?;
1300    if pad_len > 0 {
1301        writer.write_all(&PADDING[..pad_len])?;
1302    }
1303
1304    writer.flush()?;
1305    Ok(total_len)
1306}
1307
1308/// Write a record batch to the writer, writing the message size before the message
1309/// if the record batch is being written to a stream
1310fn write_continuation<W: Write>(
1311    mut writer: W,
1312    write_options: &IpcWriteOptions,
1313    total_len: i32,
1314) -> Result<usize, ArrowError> {
1315    let mut written = 8;
1316
1317    // the version of the writer determines whether continuation markers should be added
1318    match write_options.metadata_version {
1319        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1320            unreachable!("Options with the metadata version cannot be created")
1321        }
1322        crate::MetadataVersion::V4 => {
1323            if !write_options.write_legacy_ipc_format {
1324                // v0.15.0 format
1325                writer.write_all(&CONTINUATION_MARKER)?;
1326                written = 4;
1327            }
1328            writer.write_all(&total_len.to_le_bytes()[..])?;
1329        }
1330        crate::MetadataVersion::V5 => {
1331            // write continuation marker and message length
1332            writer.write_all(&CONTINUATION_MARKER)?;
1333            writer.write_all(&total_len.to_le_bytes()[..])?;
1334        }
1335        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1336    };
1337
1338    writer.flush()?;
1339
1340    Ok(written)
1341}
1342
1343/// In V4, null types have no validity bitmap
1344/// In V5 and later, null and union types have no validity bitmap
1345/// Run end encoded type has no validity bitmap.
1346fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1347    if write_options.metadata_version < crate::MetadataVersion::V5 {
1348        !matches!(data_type, DataType::Null)
1349    } else {
1350        !matches!(
1351            data_type,
1352            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1353        )
1354    }
1355}
1356
1357/// Whether to truncate the buffer
1358#[inline]
1359fn buffer_need_truncate(
1360    array_offset: usize,
1361    buffer: &Buffer,
1362    spec: &BufferSpec,
1363    min_length: usize,
1364) -> bool {
1365    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1366}
1367
1368/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1369#[inline]
1370fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1371    match spec {
1372        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1373        _ => 0,
1374    }
1375}
1376
1377/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1378/// original start offset and length for use in slicing child data.
1379fn reencode_offsets<O: OffsetSizeTrait>(
1380    offsets: &Buffer,
1381    data: &ArrayData,
1382) -> (Buffer, usize, usize) {
1383    let offsets_slice: &[O] = offsets.typed_data::<O>();
1384    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1385
1386    let start_offset = offset_slice.first().unwrap();
1387    let end_offset = offset_slice.last().unwrap();
1388
1389    let offsets = match start_offset.as_usize() {
1390        0 => {
1391            let size = size_of::<O>();
1392            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1393        }
1394        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1395    };
1396
1397    let start_offset = start_offset.as_usize();
1398    let end_offset = end_offset.as_usize();
1399
1400    (offsets, start_offset, end_offset - start_offset)
1401}
1402
1403/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1404///
1405/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1406/// slicing the values buffer as appropriate. This helps reduce the encoded
1407/// size of sliced arrays, as values that have been sliced away are not encoded
1408fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1409    if data.is_empty() {
1410        return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1411    }
1412
1413    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1414    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1415    (offsets, values)
1416}
1417
1418/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1419/// of a values buffer.
1420fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1421    if data.is_empty() {
1422        return (
1423            MutableBuffer::new(0).into(),
1424            data.child_data()[0].slice(0, 0),
1425        );
1426    }
1427
1428    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1429    let child_data = data.child_data()[0].slice(original_start_offset, len);
1430    (offsets, child_data)
1431}
1432
1433/// Write array data to a vector of bytes
1434#[allow(clippy::too_many_arguments)]
1435fn write_array_data(
1436    array_data: &ArrayData,
1437    buffers: &mut Vec<crate::Buffer>,
1438    arrow_data: &mut Vec<u8>,
1439    nodes: &mut Vec<crate::FieldNode>,
1440    offset: i64,
1441    num_rows: usize,
1442    null_count: usize,
1443    compression_codec: Option<CompressionCodec>,
1444    write_options: &IpcWriteOptions,
1445) -> Result<i64, ArrowError> {
1446    let mut offset = offset;
1447    if !matches!(array_data.data_type(), DataType::Null) {
1448        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1449    } else {
1450        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1451        // where null_count is always 0.
1452        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1453    }
1454    if has_validity_bitmap(array_data.data_type(), write_options) {
1455        // write null buffer if exists
1456        let null_buffer = match array_data.nulls() {
1457            None => {
1458                // create a buffer and fill it with valid bits
1459                let num_bytes = bit_util::ceil(num_rows, 8);
1460                let buffer = MutableBuffer::new(num_bytes);
1461                let buffer = buffer.with_bitset(num_bytes, true);
1462                buffer.into()
1463            }
1464            Some(buffer) => buffer.inner().sliced(),
1465        };
1466
1467        offset = write_buffer(
1468            null_buffer.as_slice(),
1469            buffers,
1470            arrow_data,
1471            offset,
1472            compression_codec,
1473            write_options.alignment,
1474        )?;
1475    }
1476
1477    let data_type = array_data.data_type();
1478    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1479        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1480        for buffer in [offsets, values] {
1481            offset = write_buffer(
1482                buffer.as_slice(),
1483                buffers,
1484                arrow_data,
1485                offset,
1486                compression_codec,
1487                write_options.alignment,
1488            )?;
1489        }
1490    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1491        // Slicing the views buffer is safe and easy,
1492        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1493        //
1494        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1495        // If users wants to "compact" the arrays prior to sending them over IPC,
1496        // they should consider the gc API suggested in #5513
1497        for buffer in array_data.buffers() {
1498            offset = write_buffer(
1499                buffer.as_slice(),
1500                buffers,
1501                arrow_data,
1502                offset,
1503                compression_codec,
1504                write_options.alignment,
1505            )?;
1506        }
1507    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1508        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1509        for buffer in [offsets, values] {
1510            offset = write_buffer(
1511                buffer.as_slice(),
1512                buffers,
1513                arrow_data,
1514                offset,
1515                compression_codec,
1516                write_options.alignment,
1517            )?;
1518        }
1519    } else if DataType::is_numeric(data_type)
1520        || DataType::is_temporal(data_type)
1521        || matches!(
1522            array_data.data_type(),
1523            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1524        )
1525    {
1526        // Truncate values
1527        assert_eq!(array_data.buffers().len(), 1);
1528
1529        let buffer = &array_data.buffers()[0];
1530        let layout = layout(data_type);
1531        let spec = &layout.buffers[0];
1532
1533        let byte_width = get_buffer_element_width(spec);
1534        let min_length = array_data.len() * byte_width;
1535        let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1536            let byte_offset = array_data.offset() * byte_width;
1537            let buffer_length = min(min_length, buffer.len() - byte_offset);
1538            &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1539        } else {
1540            buffer.as_slice()
1541        };
1542        offset = write_buffer(
1543            buffer_slice,
1544            buffers,
1545            arrow_data,
1546            offset,
1547            compression_codec,
1548            write_options.alignment,
1549        )?;
1550    } else if matches!(data_type, DataType::Boolean) {
1551        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1552        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1553        assert_eq!(array_data.buffers().len(), 1);
1554
1555        let buffer = &array_data.buffers()[0];
1556        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1557        offset = write_buffer(
1558            &buffer,
1559            buffers,
1560            arrow_data,
1561            offset,
1562            compression_codec,
1563            write_options.alignment,
1564        )?;
1565    } else if matches!(
1566        data_type,
1567        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1568    ) {
1569        assert_eq!(array_data.buffers().len(), 1);
1570        assert_eq!(array_data.child_data().len(), 1);
1571
1572        // Truncate offsets and the child data to avoid writing unnecessary data
1573        let (offsets, sliced_child_data) = match data_type {
1574            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1575            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1576            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1577            _ => unreachable!(),
1578        };
1579        offset = write_buffer(
1580            offsets.as_slice(),
1581            buffers,
1582            arrow_data,
1583            offset,
1584            compression_codec,
1585            write_options.alignment,
1586        )?;
1587        offset = write_array_data(
1588            &sliced_child_data,
1589            buffers,
1590            arrow_data,
1591            nodes,
1592            offset,
1593            sliced_child_data.len(),
1594            sliced_child_data.null_count(),
1595            compression_codec,
1596            write_options,
1597        )?;
1598        return Ok(offset);
1599    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1600        assert_eq!(array_data.child_data().len(), 1);
1601        let fixed_size = *fixed_size as usize;
1602
1603        let child_offset = array_data.offset() * fixed_size;
1604        let child_length = array_data.len() * fixed_size;
1605        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1606
1607        offset = write_array_data(
1608            &child_data,
1609            buffers,
1610            arrow_data,
1611            nodes,
1612            offset,
1613            child_data.len(),
1614            child_data.null_count(),
1615            compression_codec,
1616            write_options,
1617        )?;
1618        return Ok(offset);
1619    } else {
1620        for buffer in array_data.buffers() {
1621            offset = write_buffer(
1622                buffer,
1623                buffers,
1624                arrow_data,
1625                offset,
1626                compression_codec,
1627                write_options.alignment,
1628            )?;
1629        }
1630    }
1631
1632    match array_data.data_type() {
1633        DataType::Dictionary(_, _) => {}
1634        DataType::RunEndEncoded(_, _) => {
1635            // unslice the run encoded array.
1636            let arr = unslice_run_array(array_data.clone())?;
1637            // recursively write out nested structures
1638            for data_ref in arr.child_data() {
1639                // write the nested data (e.g list data)
1640                offset = write_array_data(
1641                    data_ref,
1642                    buffers,
1643                    arrow_data,
1644                    nodes,
1645                    offset,
1646                    data_ref.len(),
1647                    data_ref.null_count(),
1648                    compression_codec,
1649                    write_options,
1650                )?;
1651            }
1652        }
1653        _ => {
1654            // recursively write out nested structures
1655            for data_ref in array_data.child_data() {
1656                // write the nested data (e.g list data)
1657                offset = write_array_data(
1658                    data_ref,
1659                    buffers,
1660                    arrow_data,
1661                    nodes,
1662                    offset,
1663                    data_ref.len(),
1664                    data_ref.null_count(),
1665                    compression_codec,
1666                    write_options,
1667                )?;
1668            }
1669        }
1670    }
1671    Ok(offset)
1672}
1673
1674/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
1675/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
1676///
1677///
1678/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
1679/// Each constituent buffer is first compressed with the indicated
1680/// compressor, and then written with the uncompressed length in the first 8
1681/// bytes as a 64-bit little-endian signed integer followed by the compressed
1682/// buffer bytes (and then padding as required by the protocol). The
1683/// uncompressed length may be set to -1 to indicate that the data that
1684/// follows is not compressed, which can be useful for cases where
1685/// compression does not yield appreciable savings.
1686fn write_buffer(
1687    buffer: &[u8],                    // input
1688    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
1689    arrow_data: &mut Vec<u8>,         // output stream
1690    offset: i64,                      // current output stream offset
1691    compression_codec: Option<CompressionCodec>,
1692    alignment: u8,
1693) -> Result<i64, ArrowError> {
1694    let len: i64 = match compression_codec {
1695        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1696        None => {
1697            arrow_data.extend_from_slice(buffer);
1698            buffer.len()
1699        }
1700    }
1701    .try_into()
1702    .map_err(|e| {
1703        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1704    })?;
1705
1706    // make new index entry
1707    buffers.push(crate::Buffer::new(offset, len));
1708    // padding and make offset aligned
1709    let pad_len = pad_to_alignment(alignment, len as usize);
1710    arrow_data.extend_from_slice(&PADDING[..pad_len]);
1711
1712    Ok(offset + len + (pad_len as i64))
1713}
1714
1715const PADDING: [u8; 64] = [0; 64];
1716
1717/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
1718#[inline]
1719fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1720    let a = usize::from(alignment - 1);
1721    ((len + a) & !a) - len
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726    use std::hash::Hasher;
1727    use std::io::Cursor;
1728    use std::io::Seek;
1729
1730    use arrow_array::builder::FixedSizeListBuilder;
1731    use arrow_array::builder::Float32Builder;
1732    use arrow_array::builder::Int64Builder;
1733    use arrow_array::builder::MapBuilder;
1734    use arrow_array::builder::UnionBuilder;
1735    use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1736    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1737    use arrow_array::types::*;
1738    use arrow_buffer::ScalarBuffer;
1739
1740    use crate::convert::fb_to_schema;
1741    use crate::reader::*;
1742    use crate::root_as_footer;
1743    use crate::MetadataVersion;
1744
1745    use super::*;
1746
1747    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1748        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1749        writer.write(rb).unwrap();
1750        writer.finish().unwrap();
1751        writer.into_inner().unwrap()
1752    }
1753
1754    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1755        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1756        reader.next().unwrap().unwrap()
1757    }
1758
1759    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1760        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
1761        // without needing to construct a giant array to spill over the 64-byte default alignment
1762        // boundary.
1763        const IPC_ALIGNMENT: usize = 8;
1764
1765        let mut stream_writer = StreamWriter::try_new_with_options(
1766            vec![],
1767            record.schema_ref(),
1768            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1769        )
1770        .unwrap();
1771        stream_writer.write(record).unwrap();
1772        stream_writer.finish().unwrap();
1773        stream_writer.into_inner().unwrap()
1774    }
1775
1776    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1777        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1778        stream_reader.next().unwrap().unwrap()
1779    }
1780
1781    #[test]
1782    #[cfg(feature = "lz4")]
1783    fn test_write_empty_record_batch_lz4_compression() {
1784        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1785        let values: Vec<Option<i32>> = vec![];
1786        let array = Int32Array::from(values);
1787        let record_batch =
1788            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1789
1790        let mut file = tempfile::tempfile().unwrap();
1791
1792        {
1793            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1794                .unwrap()
1795                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1796                .unwrap();
1797
1798            let mut writer =
1799                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1800            writer.write(&record_batch).unwrap();
1801            writer.finish().unwrap();
1802        }
1803        file.rewind().unwrap();
1804        {
1805            // read file
1806            let reader = FileReader::try_new(file, None).unwrap();
1807            for read_batch in reader {
1808                read_batch
1809                    .unwrap()
1810                    .columns()
1811                    .iter()
1812                    .zip(record_batch.columns())
1813                    .for_each(|(a, b)| {
1814                        assert_eq!(a.data_type(), b.data_type());
1815                        assert_eq!(a.len(), b.len());
1816                        assert_eq!(a.null_count(), b.null_count());
1817                    });
1818            }
1819        }
1820    }
1821
1822    #[test]
1823    #[cfg(feature = "lz4")]
1824    fn test_write_file_with_lz4_compression() {
1825        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1826        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1827        let array = Int32Array::from(values);
1828        let record_batch =
1829            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1830
1831        let mut file = tempfile::tempfile().unwrap();
1832        {
1833            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1834                .unwrap()
1835                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1836                .unwrap();
1837
1838            let mut writer =
1839                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1840            writer.write(&record_batch).unwrap();
1841            writer.finish().unwrap();
1842        }
1843        file.rewind().unwrap();
1844        {
1845            // read file
1846            let reader = FileReader::try_new(file, None).unwrap();
1847            for read_batch in reader {
1848                read_batch
1849                    .unwrap()
1850                    .columns()
1851                    .iter()
1852                    .zip(record_batch.columns())
1853                    .for_each(|(a, b)| {
1854                        assert_eq!(a.data_type(), b.data_type());
1855                        assert_eq!(a.len(), b.len());
1856                        assert_eq!(a.null_count(), b.null_count());
1857                    });
1858            }
1859        }
1860    }
1861
1862    #[test]
1863    #[cfg(feature = "zstd")]
1864    fn test_write_file_with_zstd_compression() {
1865        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1866        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1867        let array = Int32Array::from(values);
1868        let record_batch =
1869            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1870        let mut file = tempfile::tempfile().unwrap();
1871        {
1872            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1873                .unwrap()
1874                .try_with_compression(Some(crate::CompressionType::ZSTD))
1875                .unwrap();
1876
1877            let mut writer =
1878                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1879            writer.write(&record_batch).unwrap();
1880            writer.finish().unwrap();
1881        }
1882        file.rewind().unwrap();
1883        {
1884            // read file
1885            let reader = FileReader::try_new(file, None).unwrap();
1886            for read_batch in reader {
1887                read_batch
1888                    .unwrap()
1889                    .columns()
1890                    .iter()
1891                    .zip(record_batch.columns())
1892                    .for_each(|(a, b)| {
1893                        assert_eq!(a.data_type(), b.data_type());
1894                        assert_eq!(a.len(), b.len());
1895                        assert_eq!(a.null_count(), b.null_count());
1896                    });
1897            }
1898        }
1899    }
1900
1901    #[test]
1902    fn test_write_file() {
1903        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
1904        let values: Vec<Option<u32>> = vec![
1905            Some(999),
1906            None,
1907            Some(235),
1908            Some(123),
1909            None,
1910            None,
1911            None,
1912            None,
1913            None,
1914        ];
1915        let array1 = UInt32Array::from(values);
1916        let batch =
1917            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
1918                .unwrap();
1919        let mut file = tempfile::tempfile().unwrap();
1920        {
1921            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
1922
1923            writer.write(&batch).unwrap();
1924            writer.finish().unwrap();
1925        }
1926        file.rewind().unwrap();
1927
1928        {
1929            let mut reader = FileReader::try_new(file, None).unwrap();
1930            while let Some(Ok(read_batch)) = reader.next() {
1931                read_batch
1932                    .columns()
1933                    .iter()
1934                    .zip(batch.columns())
1935                    .for_each(|(a, b)| {
1936                        assert_eq!(a.data_type(), b.data_type());
1937                        assert_eq!(a.len(), b.len());
1938                        assert_eq!(a.null_count(), b.null_count());
1939                    });
1940            }
1941        }
1942    }
1943
1944    fn write_null_file(options: IpcWriteOptions) {
1945        let schema = Schema::new(vec![
1946            Field::new("nulls", DataType::Null, true),
1947            Field::new("int32s", DataType::Int32, false),
1948            Field::new("nulls2", DataType::Null, true),
1949            Field::new("f64s", DataType::Float64, false),
1950        ]);
1951        let array1 = NullArray::new(32);
1952        let array2 = Int32Array::from(vec![1; 32]);
1953        let array3 = NullArray::new(32);
1954        let array4 = Float64Array::from(vec![f64::NAN; 32]);
1955        let batch = RecordBatch::try_new(
1956            Arc::new(schema.clone()),
1957            vec![
1958                Arc::new(array1) as ArrayRef,
1959                Arc::new(array2) as ArrayRef,
1960                Arc::new(array3) as ArrayRef,
1961                Arc::new(array4) as ArrayRef,
1962            ],
1963        )
1964        .unwrap();
1965        let mut file = tempfile::tempfile().unwrap();
1966        {
1967            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
1968
1969            writer.write(&batch).unwrap();
1970            writer.finish().unwrap();
1971        }
1972
1973        file.rewind().unwrap();
1974
1975        {
1976            let reader = FileReader::try_new(file, None).unwrap();
1977            reader.for_each(|maybe_batch| {
1978                maybe_batch
1979                    .unwrap()
1980                    .columns()
1981                    .iter()
1982                    .zip(batch.columns())
1983                    .for_each(|(a, b)| {
1984                        assert_eq!(a.data_type(), b.data_type());
1985                        assert_eq!(a.len(), b.len());
1986                        assert_eq!(a.null_count(), b.null_count());
1987                    });
1988            });
1989        }
1990    }
1991    #[test]
1992    fn test_write_null_file_v4() {
1993        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
1994        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
1995        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
1996        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
1997    }
1998
1999    #[test]
2000    fn test_write_null_file_v5() {
2001        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2002        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2003    }
2004
2005    #[test]
2006    fn track_union_nested_dict() {
2007        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2008
2009        let array = Arc::new(inner) as ArrayRef;
2010
2011        // Dict field with id 2
2012        #[allow(deprecated)]
2013        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2014        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2015
2016        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2017        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2018
2019        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2020
2021        let schema = Arc::new(Schema::new(vec![Field::new(
2022            "union",
2023            union.data_type().clone(),
2024            false,
2025        )]));
2026
2027        let gen = IpcDataGenerator {};
2028        let mut dict_tracker = DictionaryTracker::new(false);
2029        gen.schema_to_bytes_with_dictionary_tracker(
2030            &schema,
2031            &mut dict_tracker,
2032            &IpcWriteOptions::default(),
2033        );
2034
2035        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2036
2037        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2038            .unwrap();
2039
2040        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2041        // so we expect the dict will be keyed to 0
2042        assert!(dict_tracker.written.contains_key(&0));
2043    }
2044
2045    #[test]
2046    fn track_struct_nested_dict() {
2047        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2048
2049        let array = Arc::new(inner) as ArrayRef;
2050
2051        // Dict field with id 2
2052        #[allow(deprecated)]
2053        let dctfield = Arc::new(Field::new_dict(
2054            "dict",
2055            array.data_type().clone(),
2056            false,
2057            2,
2058            false,
2059        ));
2060
2061        let s = StructArray::from(vec![(dctfield, array)]);
2062        let struct_array = Arc::new(s) as ArrayRef;
2063
2064        let schema = Arc::new(Schema::new(vec![Field::new(
2065            "struct",
2066            struct_array.data_type().clone(),
2067            false,
2068        )]));
2069
2070        let gen = IpcDataGenerator {};
2071        let mut dict_tracker = DictionaryTracker::new(false);
2072        gen.schema_to_bytes_with_dictionary_tracker(
2073            &schema,
2074            &mut dict_tracker,
2075            &IpcWriteOptions::default(),
2076        );
2077
2078        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2079
2080        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2081            .unwrap();
2082
2083        assert!(dict_tracker.written.contains_key(&0));
2084    }
2085
2086    fn write_union_file(options: IpcWriteOptions) {
2087        let schema = Schema::new(vec![Field::new_union(
2088            "union",
2089            vec![0, 1],
2090            vec![
2091                Field::new("a", DataType::Int32, false),
2092                Field::new("c", DataType::Float64, false),
2093            ],
2094            UnionMode::Sparse,
2095        )]);
2096        let mut builder = UnionBuilder::with_capacity_sparse(5);
2097        builder.append::<Int32Type>("a", 1).unwrap();
2098        builder.append_null::<Int32Type>("a").unwrap();
2099        builder.append::<Float64Type>("c", 3.0).unwrap();
2100        builder.append_null::<Float64Type>("c").unwrap();
2101        builder.append::<Int32Type>("a", 4).unwrap();
2102        let union = builder.build().unwrap();
2103
2104        let batch =
2105            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2106                .unwrap();
2107
2108        let mut file = tempfile::tempfile().unwrap();
2109        {
2110            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2111
2112            writer.write(&batch).unwrap();
2113            writer.finish().unwrap();
2114        }
2115        file.rewind().unwrap();
2116
2117        {
2118            let reader = FileReader::try_new(file, None).unwrap();
2119            reader.for_each(|maybe_batch| {
2120                maybe_batch
2121                    .unwrap()
2122                    .columns()
2123                    .iter()
2124                    .zip(batch.columns())
2125                    .for_each(|(a, b)| {
2126                        assert_eq!(a.data_type(), b.data_type());
2127                        assert_eq!(a.len(), b.len());
2128                        assert_eq!(a.null_count(), b.null_count());
2129                    });
2130            });
2131        }
2132    }
2133
2134    #[test]
2135    fn test_write_union_file_v4_v5() {
2136        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2137        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2138    }
2139
2140    #[test]
2141    fn test_write_view_types() {
2142        const LONG_TEST_STRING: &str =
2143            "This is a long string to make sure binary view array handles it";
2144        let schema = Schema::new(vec![
2145            Field::new("field1", DataType::BinaryView, true),
2146            Field::new("field2", DataType::Utf8View, true),
2147        ]);
2148        let values: Vec<Option<&[u8]>> = vec![
2149            Some(b"foo"),
2150            Some(b"bar"),
2151            Some(LONG_TEST_STRING.as_bytes()),
2152        ];
2153        let binary_array = BinaryViewArray::from_iter(values);
2154        let utf8_array =
2155            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2156        let record_batch = RecordBatch::try_new(
2157            Arc::new(schema.clone()),
2158            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2159        )
2160        .unwrap();
2161
2162        let mut file = tempfile::tempfile().unwrap();
2163        {
2164            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2165            writer.write(&record_batch).unwrap();
2166            writer.finish().unwrap();
2167        }
2168        file.rewind().unwrap();
2169        {
2170            let mut reader = FileReader::try_new(&file, None).unwrap();
2171            let read_batch = reader.next().unwrap().unwrap();
2172            read_batch
2173                .columns()
2174                .iter()
2175                .zip(record_batch.columns())
2176                .for_each(|(a, b)| {
2177                    assert_eq!(a, b);
2178                });
2179        }
2180        file.rewind().unwrap();
2181        {
2182            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2183            let read_batch = reader.next().unwrap().unwrap();
2184            assert_eq!(read_batch.num_columns(), 1);
2185            let read_array = read_batch.column(0);
2186            let write_array = record_batch.column(0);
2187            assert_eq!(read_array, write_array);
2188        }
2189    }
2190
2191    #[test]
2192    fn truncate_ipc_record_batch() {
2193        fn create_batch(rows: usize) -> RecordBatch {
2194            let schema = Schema::new(vec![
2195                Field::new("a", DataType::Int32, false),
2196                Field::new("b", DataType::Utf8, false),
2197            ]);
2198
2199            let a = Int32Array::from_iter_values(0..rows as i32);
2200            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2201
2202            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2203        }
2204
2205        let big_record_batch = create_batch(65536);
2206
2207        let length = 5;
2208        let small_record_batch = create_batch(length);
2209
2210        let offset = 2;
2211        let record_batch_slice = big_record_batch.slice(offset, length);
2212        assert!(
2213            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2214        );
2215        assert_eq!(
2216            serialize_stream(&small_record_batch).len(),
2217            serialize_stream(&record_batch_slice).len()
2218        );
2219
2220        assert_eq!(
2221            deserialize_stream(serialize_stream(&record_batch_slice)),
2222            record_batch_slice
2223        );
2224    }
2225
2226    #[test]
2227    fn truncate_ipc_record_batch_with_nulls() {
2228        fn create_batch() -> RecordBatch {
2229            let schema = Schema::new(vec![
2230                Field::new("a", DataType::Int32, true),
2231                Field::new("b", DataType::Utf8, true),
2232            ]);
2233
2234            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2235            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2236
2237            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2238        }
2239
2240        let record_batch = create_batch();
2241        let record_batch_slice = record_batch.slice(1, 2);
2242        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2243
2244        assert!(
2245            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2246        );
2247
2248        assert!(deserialized_batch.column(0).is_null(0));
2249        assert!(deserialized_batch.column(0).is_valid(1));
2250        assert!(deserialized_batch.column(1).is_valid(0));
2251        assert!(deserialized_batch.column(1).is_valid(1));
2252
2253        assert_eq!(record_batch_slice, deserialized_batch);
2254    }
2255
2256    #[test]
2257    fn truncate_ipc_dictionary_array() {
2258        fn create_batch() -> RecordBatch {
2259            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2260                .into_iter()
2261                .collect();
2262            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2263
2264            let array = DictionaryArray::new(keys, Arc::new(values));
2265
2266            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2267
2268            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2269        }
2270
2271        let record_batch = create_batch();
2272        let record_batch_slice = record_batch.slice(1, 2);
2273        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2274
2275        assert!(
2276            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2277        );
2278
2279        assert!(deserialized_batch.column(0).is_valid(0));
2280        assert!(deserialized_batch.column(0).is_null(1));
2281
2282        assert_eq!(record_batch_slice, deserialized_batch);
2283    }
2284
2285    #[test]
2286    fn truncate_ipc_struct_array() {
2287        fn create_batch() -> RecordBatch {
2288            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2289                .into_iter()
2290                .collect();
2291            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2292
2293            let struct_array = StructArray::from(vec![
2294                (
2295                    Arc::new(Field::new("s", DataType::Utf8, true)),
2296                    Arc::new(strings) as ArrayRef,
2297                ),
2298                (
2299                    Arc::new(Field::new("c", DataType::Int32, true)),
2300                    Arc::new(ints) as ArrayRef,
2301                ),
2302            ]);
2303
2304            let schema = Schema::new(vec![Field::new(
2305                "struct_array",
2306                struct_array.data_type().clone(),
2307                true,
2308            )]);
2309
2310            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2311        }
2312
2313        let record_batch = create_batch();
2314        let record_batch_slice = record_batch.slice(1, 2);
2315        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2316
2317        assert!(
2318            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2319        );
2320
2321        let structs = deserialized_batch
2322            .column(0)
2323            .as_any()
2324            .downcast_ref::<StructArray>()
2325            .unwrap();
2326
2327        assert!(structs.column(0).is_null(0));
2328        assert!(structs.column(0).is_valid(1));
2329        assert!(structs.column(1).is_valid(0));
2330        assert!(structs.column(1).is_null(1));
2331        assert_eq!(record_batch_slice, deserialized_batch);
2332    }
2333
2334    #[test]
2335    fn truncate_ipc_string_array_with_all_empty_string() {
2336        fn create_batch() -> RecordBatch {
2337            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2338            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2339            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2340        }
2341
2342        let record_batch = create_batch();
2343        let record_batch_slice = record_batch.slice(0, 1);
2344        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2345
2346        assert!(
2347            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2348        );
2349        assert_eq!(record_batch_slice, deserialized_batch);
2350    }
2351
2352    #[test]
2353    fn test_stream_writer_writes_array_slice() {
2354        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2355        assert_eq!(
2356            vec![Some(1), Some(2), Some(3)],
2357            array.iter().collect::<Vec<_>>()
2358        );
2359
2360        let sliced = array.slice(1, 2);
2361        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2362
2363        let batch = RecordBatch::try_new(
2364            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2365            vec![Arc::new(sliced)],
2366        )
2367        .expect("new batch");
2368
2369        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2370        writer.write(&batch).expect("write");
2371        let outbuf = writer.into_inner().expect("inner");
2372
2373        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2374        let read_batch = reader.next().unwrap().expect("read batch");
2375
2376        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2377        assert_eq!(
2378            vec![Some(2), Some(3)],
2379            read_array.iter().collect::<Vec<_>>()
2380        );
2381    }
2382
2383    #[test]
2384    fn test_large_slice_uint32() {
2385        ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2386            if i % 2 == 0 {
2387                Some(i)
2388            } else {
2389                None
2390            }
2391        }))));
2392    }
2393
2394    #[test]
2395    fn test_large_slice_string() {
2396        let strings: Vec<_> = (0..8000)
2397            .map(|i| {
2398                if i % 2 == 0 {
2399                    Some(format!("value{i}"))
2400                } else {
2401                    None
2402                }
2403            })
2404            .collect();
2405
2406        ensure_roundtrip(Arc::new(StringArray::from(strings)));
2407    }
2408
2409    #[test]
2410    fn test_large_slice_string_list() {
2411        let mut ls = ListBuilder::new(StringBuilder::new());
2412
2413        let mut s = String::new();
2414        for row_number in 0..8000 {
2415            if row_number % 2 == 0 {
2416                for list_element in 0..1000 {
2417                    s.clear();
2418                    use std::fmt::Write;
2419                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2420                    ls.values().append_value(&s);
2421                }
2422                ls.append(true)
2423            } else {
2424                ls.append(false); // null
2425            }
2426        }
2427
2428        ensure_roundtrip(Arc::new(ls.finish()));
2429    }
2430
2431    #[test]
2432    fn test_large_slice_string_list_of_lists() {
2433        // The reason for the special test is to verify reencode_offsets which looks both at
2434        // the starting offset and the data offset.  So need a dataset where the starting_offset
2435        // is zero but the data offset is not.
2436        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2437
2438        for _ in 0..4000 {
2439            ls.values().append(true);
2440            ls.append(true)
2441        }
2442
2443        let mut s = String::new();
2444        for row_number in 0..4000 {
2445            if row_number % 2 == 0 {
2446                for list_element in 0..1000 {
2447                    s.clear();
2448                    use std::fmt::Write;
2449                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2450                    ls.values().values().append_value(&s);
2451                }
2452                ls.values().append(true);
2453                ls.append(true)
2454            } else {
2455                ls.append(false); // null
2456            }
2457        }
2458
2459        ensure_roundtrip(Arc::new(ls.finish()));
2460    }
2461
2462    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2463    fn ensure_roundtrip(array: ArrayRef) {
2464        let num_rows = array.len();
2465        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2466        // take off the first element
2467        let sliced_batch = orig_batch.slice(1, num_rows - 1);
2468
2469        let schema = orig_batch.schema();
2470        let stream_data = {
2471            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2472            writer.write(&sliced_batch).unwrap();
2473            writer.into_inner().unwrap()
2474        };
2475        let read_batch = {
2476            let projection = None;
2477            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2478            reader
2479                .next()
2480                .expect("expect no errors reading batch")
2481                .expect("expect batch")
2482        };
2483        assert_eq!(sliced_batch, read_batch);
2484
2485        let file_data = {
2486            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2487            writer.write(&sliced_batch).unwrap();
2488            writer.into_inner().unwrap().into_inner().unwrap()
2489        };
2490        let read_batch = {
2491            let projection = None;
2492            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2493            reader
2494                .next()
2495                .expect("expect no errors reading batch")
2496                .expect("expect batch")
2497        };
2498        assert_eq!(sliced_batch, read_batch);
2499
2500        // TODO test file writer/reader
2501    }
2502
2503    #[test]
2504    fn encode_bools_slice() {
2505        // Test case for https://github.com/apache/arrow-rs/issues/3496
2506        assert_bool_roundtrip([true, false], 1, 1);
2507
2508        // slice somewhere in the middle
2509        assert_bool_roundtrip(
2510            [
2511                true, false, true, true, false, false, true, true, true, false, false, false, true,
2512                true, true, true, false, false, false, false, true, true, true, true, true, false,
2513                false, false, false, false,
2514            ],
2515            13,
2516            17,
2517        );
2518
2519        // start at byte boundary, end in the middle
2520        assert_bool_roundtrip(
2521            [
2522                true, false, true, true, false, false, true, true, true, false, false, false,
2523            ],
2524            8,
2525            2,
2526        );
2527
2528        // start and stop and byte boundary
2529        assert_bool_roundtrip(
2530            [
2531                true, false, true, true, false, false, true, true, true, false, false, false, true,
2532                true, true, true, true, false, false, false, false, false,
2533            ],
2534            8,
2535            8,
2536        );
2537    }
2538
2539    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2540        let val_bool_field = Field::new("val", DataType::Boolean, false);
2541
2542        let schema = Arc::new(Schema::new(vec![val_bool_field]));
2543
2544        let bools = BooleanArray::from(bools.to_vec());
2545
2546        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2547        let batch = batch.slice(offset, length);
2548
2549        let data = serialize_stream(&batch);
2550        let batch2 = deserialize_stream(data);
2551        assert_eq!(batch, batch2);
2552    }
2553
2554    #[test]
2555    fn test_run_array_unslice() {
2556        let total_len = 80;
2557        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2558        let repeats: Vec<usize> = vec![3, 4, 1, 2];
2559        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2560        for ix in 0_usize..32 {
2561            let repeat: usize = repeats[ix % repeats.len()];
2562            let val: Option<i32> = vals[ix % vals.len()];
2563            input_array.resize(input_array.len() + repeat, val);
2564        }
2565
2566        // Encode the input_array to run array
2567        let mut builder =
2568            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2569        builder.extend(input_array.iter().copied());
2570        let run_array = builder.finish();
2571
2572        // test for all slice lengths.
2573        for slice_len in 1..=total_len {
2574            // test for offset = 0, slice length = slice_len
2575            let sliced_run_array: RunArray<Int16Type> =
2576                run_array.slice(0, slice_len).into_data().into();
2577
2578            // Create unsliced run array.
2579            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2580            let typed = unsliced_run_array
2581                .downcast::<PrimitiveArray<Int32Type>>()
2582                .unwrap();
2583            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2584            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2585            assert_eq!(expected, actual);
2586
2587            // test for offset = total_len - slice_len, length = slice_len
2588            let sliced_run_array: RunArray<Int16Type> = run_array
2589                .slice(total_len - slice_len, slice_len)
2590                .into_data()
2591                .into();
2592
2593            // Create unsliced run array.
2594            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2595            let typed = unsliced_run_array
2596                .downcast::<PrimitiveArray<Int32Type>>()
2597                .unwrap();
2598            let expected: Vec<Option<i32>> = input_array
2599                .iter()
2600                .skip(total_len - slice_len)
2601                .copied()
2602                .collect();
2603            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2604            assert_eq!(expected, actual);
2605        }
2606    }
2607
2608    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2609        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2610
2611        for i in 0..100_000 {
2612            for value in [i, i, i] {
2613                ls.values().append_value(value);
2614            }
2615            ls.append(true)
2616        }
2617
2618        ls.finish()
2619    }
2620
2621    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2622        let mut ls =
2623            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2624
2625        for _i in 0..10_000 {
2626            for j in 0..10 {
2627                for value in [j, j, j, j] {
2628                    ls.values().values().append_value(value);
2629                }
2630                ls.values().append(true)
2631            }
2632            ls.append(true);
2633        }
2634
2635        ls.finish()
2636    }
2637
2638    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2639        let mut ls =
2640            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2641
2642        for _i in 0..999 {
2643            ls.values().append(true);
2644            ls.append(true);
2645        }
2646
2647        for j in 0..10 {
2648            for value in [j, j, j, j] {
2649                ls.values().values().append_value(value);
2650            }
2651            ls.values().append(true)
2652        }
2653        ls.append(true);
2654
2655        for i in 0..9_000 {
2656            for j in 0..10 {
2657                for value in [i + j, i + j, i + j, i + j] {
2658                    ls.values().values().append_value(value);
2659                }
2660                ls.values().append(true)
2661            }
2662            ls.append(true);
2663        }
2664
2665        ls.finish()
2666    }
2667
2668    fn generate_map_array_data() -> MapArray {
2669        let keys_builder = UInt32Builder::new();
2670        let values_builder = UInt32Builder::new();
2671
2672        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2673
2674        for i in 0..100_000 {
2675            for _j in 0..3 {
2676                builder.keys().append_value(i);
2677                builder.values().append_value(i * 2);
2678            }
2679            builder.append(true).unwrap();
2680        }
2681
2682        builder.finish()
2683    }
2684
2685    #[test]
2686    fn reencode_offsets_when_first_offset_is_not_zero() {
2687        let original_list = generate_list_data::<i32>();
2688        let original_data = original_list.into_data();
2689        let slice_data = original_data.slice(75, 7);
2690        let (new_offsets, original_start, length) =
2691            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2692        assert_eq!(
2693            vec![0, 3, 6, 9, 12, 15, 18, 21],
2694            new_offsets.typed_data::<i32>()
2695        );
2696        assert_eq!(225, original_start);
2697        assert_eq!(21, length);
2698    }
2699
2700    #[test]
2701    fn reencode_offsets_when_first_offset_is_zero() {
2702        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2703        // ls = [[], [35, 42]
2704        ls.append(true);
2705        ls.values().append_value(35);
2706        ls.values().append_value(42);
2707        ls.append(true);
2708        let original_list = ls.finish();
2709        let original_data = original_list.into_data();
2710
2711        let slice_data = original_data.slice(1, 1);
2712        let (new_offsets, original_start, length) =
2713            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2714        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2715        assert_eq!(0, original_start);
2716        assert_eq!(2, length);
2717    }
2718
2719    /// Ensure when serde full & sliced versions they are equal to original input.
2720    /// Also ensure serialized sliced version is significantly smaller than serialized full.
2721    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2722        // test both full and sliced versions
2723        let in_sliced = in_batch.slice(999, 1);
2724
2725        let bytes_batch = serialize_file(&in_batch);
2726        let bytes_sliced = serialize_file(&in_sliced);
2727
2728        // serializing 1 row should be significantly smaller than serializing 100,000
2729        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2730
2731        // ensure both are still valid and equal to originals
2732        let out_batch = deserialize_file(bytes_batch);
2733        assert_eq!(in_batch, out_batch);
2734
2735        let out_sliced = deserialize_file(bytes_sliced);
2736        assert_eq!(in_sliced, out_sliced);
2737    }
2738
2739    #[test]
2740    fn encode_lists() {
2741        let val_inner = Field::new_list_field(DataType::UInt32, true);
2742        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2743        let schema = Arc::new(Schema::new(vec![val_list_field]));
2744
2745        let values = Arc::new(generate_list_data::<i32>());
2746
2747        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2748        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2749    }
2750
2751    #[test]
2752    fn encode_empty_list() {
2753        let val_inner = Field::new_list_field(DataType::UInt32, true);
2754        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2755        let schema = Arc::new(Schema::new(vec![val_list_field]));
2756
2757        let values = Arc::new(generate_list_data::<i32>());
2758
2759        let in_batch = RecordBatch::try_new(schema, vec![values])
2760            .unwrap()
2761            .slice(999, 0);
2762        let out_batch = deserialize_file(serialize_file(&in_batch));
2763        assert_eq!(in_batch, out_batch);
2764    }
2765
2766    #[test]
2767    fn encode_large_lists() {
2768        let val_inner = Field::new_list_field(DataType::UInt32, true);
2769        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2770        let schema = Arc::new(Schema::new(vec![val_list_field]));
2771
2772        let values = Arc::new(generate_list_data::<i64>());
2773
2774        // ensure when serde full & sliced versions they are equal to original input
2775        // also ensure serialized sliced version is significantly smaller than serialized full
2776        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2777        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2778    }
2779
2780    #[test]
2781    fn encode_nested_lists() {
2782        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
2783        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
2784        let list_field = Field::new("val", DataType::List(inner_list_field), true);
2785        let schema = Arc::new(Schema::new(vec![list_field]));
2786
2787        let values = Arc::new(generate_nested_list_data::<i32>());
2788
2789        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2790        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2791    }
2792
2793    #[test]
2794    fn encode_nested_lists_starting_at_zero() {
2795        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2796        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2797        let list_field = Field::new("val", DataType::List(inner_list_field), true);
2798        let schema = Arc::new(Schema::new(vec![list_field]));
2799
2800        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
2801
2802        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2803        roundtrip_ensure_sliced_smaller(in_batch, 1);
2804    }
2805
2806    #[test]
2807    fn encode_map_array() {
2808        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2809        let values = Arc::new(Field::new("values", DataType::UInt32, true));
2810        let map_field = Field::new_map("map", "entries", keys, values, false, true);
2811        let schema = Arc::new(Schema::new(vec![map_field]));
2812
2813        let values = Arc::new(generate_map_array_data());
2814
2815        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2816        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2817    }
2818
2819    #[test]
2820    fn test_decimal128_alignment16_is_sufficient() {
2821        const IPC_ALIGNMENT: usize = 16;
2822
2823        // Test a bunch of different dimensions to ensure alignment is never an issue.
2824        // For example, if we only test `num_cols = 1` then even with alignment 8 this
2825        // test would _happen_ to pass, even though for different dimensions like
2826        // `num_cols = 2` it would fail.
2827        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2828            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
2829
2830            let mut fields = Vec::new();
2831            let mut arrays = Vec::new();
2832            for i in 0..num_cols {
2833                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
2834                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2835                fields.push(field);
2836                arrays.push(Arc::new(array) as Arc<dyn Array>);
2837            }
2838            let schema = Schema::new(fields);
2839            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2840
2841            let mut writer = FileWriter::try_new_with_options(
2842                Vec::new(),
2843                batch.schema_ref(),
2844                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2845            )
2846            .unwrap();
2847            writer.write(&batch).unwrap();
2848            writer.finish().unwrap();
2849
2850            let out: Vec<u8> = writer.into_inner().unwrap();
2851
2852            let buffer = Buffer::from_vec(out);
2853            let trailer_start = buffer.len() - 10;
2854            let footer_len =
2855                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2856            let footer =
2857                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2858
2859            let schema = fb_to_schema(footer.schema().unwrap());
2860
2861            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
2862            // for `read_record_batch` later on to read the data in a zero-copy manner.
2863            let decoder =
2864                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2865
2866            let batches = footer.recordBatches().unwrap();
2867
2868            let block = batches.get(0);
2869            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2870            let data = buffer.slice_with_length(block.offset() as _, block_len);
2871
2872            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2873
2874            assert_eq!(batch, batch2);
2875        }
2876    }
2877
2878    #[test]
2879    fn test_decimal128_alignment8_is_unaligned() {
2880        const IPC_ALIGNMENT: usize = 8;
2881
2882        let num_cols = 2;
2883        let num_rows = 1;
2884
2885        let mut fields = Vec::new();
2886        let mut arrays = Vec::new();
2887        for i in 0..num_cols {
2888            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
2889            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2890            fields.push(field);
2891            arrays.push(Arc::new(array) as Arc<dyn Array>);
2892        }
2893        let schema = Schema::new(fields);
2894        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2895
2896        let mut writer = FileWriter::try_new_with_options(
2897            Vec::new(),
2898            batch.schema_ref(),
2899            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2900        )
2901        .unwrap();
2902        writer.write(&batch).unwrap();
2903        writer.finish().unwrap();
2904
2905        let out: Vec<u8> = writer.into_inner().unwrap();
2906
2907        let buffer = Buffer::from_vec(out);
2908        let trailer_start = buffer.len() - 10;
2909        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2910        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2911        let schema = fb_to_schema(footer.schema().unwrap());
2912
2913        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
2914        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
2915        let decoder =
2916            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2917
2918        let batches = footer.recordBatches().unwrap();
2919
2920        let block = batches.get(0);
2921        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2922        let data = buffer.slice_with_length(block.offset() as _, block_len);
2923
2924        let result = decoder.read_record_batch(block, &data);
2925
2926        let error = result.unwrap_err();
2927        assert_eq!(
2928            error.to_string(),
2929            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
2930             offset from expected alignment of 16 by 8"
2931        );
2932    }
2933
2934    #[test]
2935    fn test_flush() {
2936        // We write a schema which is small enough to fit into a buffer and not get flushed,
2937        // and then force the write with .flush().
2938        let num_cols = 2;
2939        let mut fields = Vec::new();
2940        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
2941        for i in 0..num_cols {
2942            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
2943            fields.push(field);
2944        }
2945        let schema = Schema::new(fields);
2946        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
2947        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
2948        let mut stream_writer =
2949            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
2950                .unwrap();
2951        let mut file_writer =
2952            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
2953
2954        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
2955        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
2956        stream_writer.flush().unwrap();
2957        file_writer.flush().unwrap();
2958        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
2959        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
2960        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
2961        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
2962        // and then a length of 0 (4 bytes) for a total of 8 bytes.
2963        // Everything before that should have been flushed in the .flush() call.
2964        let expected_stream_flushed_bytes = stream_out.len() - 8;
2965        // A file write is the same as the stream write except for the leading magic string
2966        // ARROW1 plus padding, which is 8 bytes.
2967        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
2968
2969        assert!(
2970            stream_bytes_written_on_new < stream_bytes_written_on_flush,
2971            "this test makes no sense if flush is not actually required"
2972        );
2973        assert!(
2974            file_bytes_written_on_new < file_bytes_written_on_flush,
2975            "this test makes no sense if flush is not actually required"
2976        );
2977        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
2978        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
2979    }
2980
2981    #[test]
2982    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
2983        let l1_type =
2984            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
2985        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
2986
2987        let l0_builder = Float32Builder::new();
2988        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
2989            "item",
2990            DataType::Float32,
2991            false,
2992        )));
2993        let mut l2_builder =
2994            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
2995
2996        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
2997            l2_builder.values().values().append_value(point[0]);
2998            l2_builder.values().values().append_value(point[1]);
2999            l2_builder.values().values().append_value(point[2]);
3000
3001            l2_builder.values().append(true);
3002        }
3003        l2_builder.append(true);
3004
3005        let point = [10., 11., 12.];
3006        l2_builder.values().values().append_value(point[0]);
3007        l2_builder.values().values().append_value(point[1]);
3008        l2_builder.values().values().append_value(point[2]);
3009
3010        l2_builder.values().append(true);
3011        l2_builder.append(true);
3012
3013        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3014
3015        let schema = Arc::new(Schema::new_with_metadata(
3016            vec![Field::new("points", l2_type, false)],
3017            HashMap::default(),
3018        ));
3019
3020        // Test a variety of combinations that include 0 and non-zero offsets
3021        // and also portions or the rest of the array
3022        test_slices(&array, &schema, 0, 1)?;
3023        test_slices(&array, &schema, 0, 2)?;
3024        test_slices(&array, &schema, 1, 1)?;
3025
3026        Ok(())
3027    }
3028
3029    #[test]
3030    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3031        let l0_builder = Float32Builder::new();
3032        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3033        let mut l2_builder = ListBuilder::new(l1_builder);
3034
3035        for point in [
3036            [Some(1.0), Some(2.0), None],
3037            [Some(4.0), Some(5.0), Some(6.0)],
3038            [None, Some(8.0), Some(9.0)],
3039        ] {
3040            for p in point {
3041                match p {
3042                    Some(p) => l2_builder.values().values().append_value(p),
3043                    None => l2_builder.values().values().append_null(),
3044                }
3045            }
3046
3047            l2_builder.values().append(true);
3048        }
3049        l2_builder.append(true);
3050
3051        let point = [Some(10.), None, None];
3052        for p in point {
3053            match p {
3054                Some(p) => l2_builder.values().values().append_value(p),
3055                None => l2_builder.values().values().append_null(),
3056            }
3057        }
3058
3059        l2_builder.values().append(true);
3060        l2_builder.append(true);
3061
3062        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3063
3064        let schema = Arc::new(Schema::new_with_metadata(
3065            vec![Field::new(
3066                "points",
3067                DataType::List(Arc::new(Field::new(
3068                    "item",
3069                    DataType::FixedSizeList(
3070                        Arc::new(Field::new("item", DataType::Float32, true)),
3071                        3,
3072                    ),
3073                    true,
3074                ))),
3075                true,
3076            )],
3077            HashMap::default(),
3078        ));
3079
3080        // Test a variety of combinations that include 0 and non-zero offsets
3081        // and also portions or the rest of the array
3082        test_slices(&array, &schema, 0, 1)?;
3083        test_slices(&array, &schema, 0, 2)?;
3084        test_slices(&array, &schema, 1, 1)?;
3085
3086        Ok(())
3087    }
3088
3089    fn test_slices(
3090        parent_array: &ArrayRef,
3091        schema: &SchemaRef,
3092        offset: usize,
3093        length: usize,
3094    ) -> Result<(), ArrowError> {
3095        let subarray = parent_array.slice(offset, length);
3096        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3097
3098        let mut bytes = Vec::new();
3099        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3100        writer.write(&original_batch)?;
3101        writer.finish()?;
3102
3103        let mut cursor = std::io::Cursor::new(bytes);
3104        let mut reader = StreamReader::try_new(&mut cursor, None)?;
3105        let returned_batch = reader.next().unwrap()?;
3106
3107        assert_eq!(original_batch, returned_batch);
3108
3109        Ok(())
3110    }
3111
3112    #[test]
3113    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3114        let int_builder = Int64Builder::new();
3115        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3116            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3117
3118        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3119            fixed_list_builder.values().append_value(point[0]);
3120            fixed_list_builder.values().append_value(point[1]);
3121            fixed_list_builder.values().append_value(point[2]);
3122
3123            fixed_list_builder.append(true);
3124        }
3125
3126        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3127
3128        let schema = Arc::new(Schema::new_with_metadata(
3129            vec![Field::new(
3130                "points",
3131                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3132                false,
3133            )],
3134            HashMap::default(),
3135        ));
3136
3137        // Test a variety of combinations that include 0 and non-zero offsets
3138        // and also portions or the rest of the array
3139        test_slices(&array, &schema, 0, 4)?;
3140        test_slices(&array, &schema, 0, 2)?;
3141        test_slices(&array, &schema, 1, 3)?;
3142        test_slices(&array, &schema, 2, 1)?;
3143
3144        Ok(())
3145    }
3146
3147    #[test]
3148    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3149        let int_builder = Int64Builder::new();
3150        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3151
3152        for point in [
3153            [Some(1), Some(2), None],
3154            [Some(4), Some(5), Some(6)],
3155            [None, Some(8), Some(9)],
3156            [Some(10), None, None],
3157        ] {
3158            for p in point {
3159                match p {
3160                    Some(p) => fixed_list_builder.values().append_value(p),
3161                    None => fixed_list_builder.values().append_null(),
3162                }
3163            }
3164
3165            fixed_list_builder.append(true);
3166        }
3167
3168        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3169
3170        let schema = Arc::new(Schema::new_with_metadata(
3171            vec![Field::new(
3172                "points",
3173                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3174                true,
3175            )],
3176            HashMap::default(),
3177        ));
3178
3179        // Test a variety of combinations that include 0 and non-zero offsets
3180        // and also portions or the rest of the array
3181        test_slices(&array, &schema, 0, 4)?;
3182        test_slices(&array, &schema, 0, 2)?;
3183        test_slices(&array, &schema, 1, 3)?;
3184        test_slices(&array, &schema, 2, 1)?;
3185
3186        Ok(())
3187    }
3188
3189    #[test]
3190    fn test_metadata_encoding_ordering() {
3191        fn create_hash() -> u64 {
3192            let metadata: HashMap<String, String> = [
3193                ("a", "1"), //
3194                ("b", "2"), //
3195                ("c", "3"), //
3196                ("d", "4"), //
3197                ("e", "5"), //
3198            ]
3199            .into_iter()
3200            .map(|(k, v)| (k.to_owned(), v.to_owned()))
3201            .collect();
3202
3203            // Set metadata on both the schema and a field within it.
3204            let schema = Arc::new(
3205                Schema::new(vec![
3206                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone())
3207                ])
3208                .with_metadata(metadata)
3209                .clone(),
3210            );
3211            let batch = RecordBatch::new_empty(schema.clone());
3212
3213            let mut bytes = Vec::new();
3214            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3215            w.write(&batch).unwrap();
3216            w.finish().unwrap();
3217
3218            let mut h = std::hash::DefaultHasher::new();
3219            h.write(&bytes);
3220            h.finish()
3221        }
3222
3223        let expected = create_hash();
3224
3225        // Since there is randomness in the HashMap and we cannot specify our
3226        // own Hasher for the implementation used for metadata, run the above
3227        // code 20x and verify it does not change. This is not perfect but it
3228        // should be good enough.
3229        let all_passed = (0..20).all(|_| create_hash() == expected);
3230        assert!(all_passed);
3231    }
3232}