arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
49#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51    /// Write padding after memory buffers to this multiple of bytes.
52    /// Must be 8, 16, 32, or 64 - defaults to 64.
53    alignment: u8,
54    /// The legacy format is for releases before 0.15.0, and uses metadata V4
55    write_legacy_ipc_format: bool,
56    /// The metadata version to write. The Rust IPC writer supports V4+
57    ///
58    /// *Default versions per crate*
59    ///
60    /// When creating the default IpcWriteOptions, the following metadata versions are used:
61    ///
62    /// version 2.0.0: V4, with legacy format enabled
63    /// version 4.0.0: V5
64    metadata_version: crate::MetadataVersion,
65    /// Compression, if desired. Will result in a runtime error
66    /// if the corresponding feature is not enabled
67    batch_compression_type: Option<crate::CompressionType>,
68    /// Flag indicating whether the writer should preserve the dictionary IDs defined in the
69    /// schema or generate unique dictionary IDs internally during encoding.
70    ///
71    /// Defaults to `false`
72    #[deprecated(
73        since = "54.0.0",
74        note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
75    )]
76    preserve_dict_id: bool,
77}
78
79impl IpcWriteOptions {
80    /// Configures compression when writing IPC files.
81    ///
82    /// Will result in a runtime error if the corresponding feature
83    /// is not enabled
84    pub fn try_with_compression(
85        mut self,
86        batch_compression_type: Option<crate::CompressionType>,
87    ) -> Result<Self, ArrowError> {
88        self.batch_compression_type = batch_compression_type;
89
90        if self.batch_compression_type.is_some()
91            && self.metadata_version < crate::MetadataVersion::V5
92        {
93            return Err(ArrowError::InvalidArgumentError(
94                "Compression only supported in metadata v5 and above".to_string(),
95            ));
96        }
97        Ok(self)
98    }
99    /// Try to create IpcWriteOptions, checking for incompatible settings
100    pub fn try_new(
101        alignment: usize,
102        write_legacy_ipc_format: bool,
103        metadata_version: crate::MetadataVersion,
104    ) -> Result<Self, ArrowError> {
105        let is_alignment_valid =
106            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
107        if !is_alignment_valid {
108            return Err(ArrowError::InvalidArgumentError(
109                "Alignment should be 8, 16, 32, or 64.".to_string(),
110            ));
111        }
112        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
113        match metadata_version {
114            crate::MetadataVersion::V1
115            | crate::MetadataVersion::V2
116            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
117                "Writing IPC metadata version 3 and lower not supported".to_string(),
118            )),
119            #[allow(deprecated)]
120            crate::MetadataVersion::V4 => Ok(Self {
121                alignment,
122                write_legacy_ipc_format,
123                metadata_version,
124                batch_compression_type: None,
125                preserve_dict_id: false,
126            }),
127            crate::MetadataVersion::V5 => {
128                if write_legacy_ipc_format {
129                    Err(ArrowError::InvalidArgumentError(
130                        "Legacy IPC format only supported on metadata version 4".to_string(),
131                    ))
132                } else {
133                    #[allow(deprecated)]
134                    Ok(Self {
135                        alignment,
136                        write_legacy_ipc_format,
137                        metadata_version,
138                        batch_compression_type: None,
139                        preserve_dict_id: false,
140                    })
141                }
142            }
143            z => Err(ArrowError::InvalidArgumentError(format!(
144                "Unsupported crate::MetadataVersion {z:?}"
145            ))),
146        }
147    }
148
149    /// Return whether the writer is configured to preserve the dictionary IDs
150    /// defined in the schema
151    #[deprecated(
152        since = "54.0.0",
153        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
154    )]
155    pub fn preserve_dict_id(&self) -> bool {
156        #[allow(deprecated)]
157        self.preserve_dict_id
158    }
159
160    /// Set whether the IPC writer should preserve the dictionary IDs in the schema
161    /// or auto-assign unique dictionary IDs during encoding (defaults to true)
162    ///
163    /// If this option is true,  the application must handle assigning ids
164    /// to the dictionary batches in order to encode them correctly
165    ///
166    /// The default will change to `false`  in future releases
167    #[deprecated(
168        since = "54.0.0",
169        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
170    )]
171    #[allow(deprecated)]
172    pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self {
173        self.preserve_dict_id = preserve_dict_id;
174        self
175    }
176}
177
178impl Default for IpcWriteOptions {
179    fn default() -> Self {
180        #[allow(deprecated)]
181        Self {
182            alignment: 64,
183            write_legacy_ipc_format: false,
184            metadata_version: crate::MetadataVersion::V5,
185            batch_compression_type: None,
186            preserve_dict_id: false,
187        }
188    }
189}
190
191#[derive(Debug, Default)]
192/// Handles low level details of encoding [`Array`] and [`Schema`] into the
193/// [Arrow IPC Format].
194///
195/// # Example
196/// ```
197/// # fn run() {
198/// # use std::sync::Arc;
199/// # use arrow_array::UInt64Array;
200/// # use arrow_array::RecordBatch;
201/// # use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
202///
203/// // Create a record batch
204/// let batch = RecordBatch::try_from_iter(vec![
205///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
206/// ]).unwrap();
207///
208/// // Error of dictionary ids are replaced.
209/// let error_on_replacement = true;
210/// let options = IpcWriteOptions::default();
211/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
212///
213/// // encode the batch into zero or more encoded dictionaries
214/// // and the data for the actual array.
215/// let data_gen = IpcDataGenerator::default();
216/// let (encoded_dictionaries, encoded_message) = data_gen
217///   .encoded_batch(&batch, &mut dictionary_tracker, &options)
218///   .unwrap();
219/// # }
220/// ```
221///
222/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
223pub struct IpcDataGenerator {}
224
225impl IpcDataGenerator {
226    /// Converts a schema to an IPC message along with `dictionary_tracker`
227    /// and returns it encoded inside [EncodedData] as a flatbuffer
228    ///
229    /// Preferred method over [IpcDataGenerator::schema_to_bytes] since it's
230    /// deprecated since Arrow v54.0.0
231    pub fn schema_to_bytes_with_dictionary_tracker(
232        &self,
233        schema: &Schema,
234        dictionary_tracker: &mut DictionaryTracker,
235        write_options: &IpcWriteOptions,
236    ) -> EncodedData {
237        let mut fbb = FlatBufferBuilder::new();
238        let schema = {
239            let fb = IpcSchemaEncoder::new()
240                .with_dictionary_tracker(dictionary_tracker)
241                .schema_to_fb_offset(&mut fbb, schema);
242            fb.as_union_value()
243        };
244
245        let mut message = crate::MessageBuilder::new(&mut fbb);
246        message.add_version(write_options.metadata_version);
247        message.add_header_type(crate::MessageHeader::Schema);
248        message.add_bodyLength(0);
249        message.add_header(schema);
250        // TODO: custom metadata
251        let data = message.finish();
252        fbb.finish(data, None);
253
254        let data = fbb.finished_data();
255        EncodedData {
256            ipc_message: data.to_vec(),
257            arrow_data: vec![],
258        }
259    }
260
261    #[deprecated(
262        since = "54.0.0",
263        note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release."
264    )]
265    /// Converts a schema to an IPC message and returns it encoded inside [EncodedData] as a flatbuffer
266    pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
267        let mut fbb = FlatBufferBuilder::new();
268        let schema = {
269            #[allow(deprecated)]
270            // This will be replaced with the IpcSchemaConverter in the next release.
271            let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
272            fb.as_union_value()
273        };
274
275        let mut message = crate::MessageBuilder::new(&mut fbb);
276        message.add_version(write_options.metadata_version);
277        message.add_header_type(crate::MessageHeader::Schema);
278        message.add_bodyLength(0);
279        message.add_header(schema);
280        // TODO: custom metadata
281        let data = message.finish();
282        fbb.finish(data, None);
283
284        let data = fbb.finished_data();
285        EncodedData {
286            ipc_message: data.to_vec(),
287            arrow_data: vec![],
288        }
289    }
290
291    fn _encode_dictionaries<I: Iterator<Item = i64>>(
292        &self,
293        column: &ArrayRef,
294        encoded_dictionaries: &mut Vec<EncodedData>,
295        dictionary_tracker: &mut DictionaryTracker,
296        write_options: &IpcWriteOptions,
297        dict_id: &mut I,
298    ) -> Result<(), ArrowError> {
299        match column.data_type() {
300            DataType::Struct(fields) => {
301                let s = as_struct_array(column);
302                for (field, column) in fields.iter().zip(s.columns()) {
303                    self.encode_dictionaries(
304                        field,
305                        column,
306                        encoded_dictionaries,
307                        dictionary_tracker,
308                        write_options,
309                        dict_id,
310                    )?;
311                }
312            }
313            DataType::RunEndEncoded(_, values) => {
314                let data = column.to_data();
315                if data.child_data().len() != 2 {
316                    return Err(ArrowError::InvalidArgumentError(format!(
317                        "The run encoded array should have exactly two child arrays. Found {}",
318                        data.child_data().len()
319                    )));
320                }
321                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
322                // only for values array.
323                let values_array = make_array(data.child_data()[1].clone());
324                self.encode_dictionaries(
325                    values,
326                    &values_array,
327                    encoded_dictionaries,
328                    dictionary_tracker,
329                    write_options,
330                    dict_id,
331                )?;
332            }
333            DataType::List(field) => {
334                let list = as_list_array(column);
335                self.encode_dictionaries(
336                    field,
337                    list.values(),
338                    encoded_dictionaries,
339                    dictionary_tracker,
340                    write_options,
341                    dict_id,
342                )?;
343            }
344            DataType::LargeList(field) => {
345                let list = as_large_list_array(column);
346                self.encode_dictionaries(
347                    field,
348                    list.values(),
349                    encoded_dictionaries,
350                    dictionary_tracker,
351                    write_options,
352                    dict_id,
353                )?;
354            }
355            DataType::FixedSizeList(field, _) => {
356                let list = column
357                    .as_any()
358                    .downcast_ref::<FixedSizeListArray>()
359                    .expect("Unable to downcast to fixed size list array");
360                self.encode_dictionaries(
361                    field,
362                    list.values(),
363                    encoded_dictionaries,
364                    dictionary_tracker,
365                    write_options,
366                    dict_id,
367                )?;
368            }
369            DataType::Map(field, _) => {
370                let map_array = as_map_array(column);
371
372                let (keys, values) = match field.data_type() {
373                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
374                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
375                };
376
377                // keys
378                self.encode_dictionaries(
379                    keys,
380                    map_array.keys(),
381                    encoded_dictionaries,
382                    dictionary_tracker,
383                    write_options,
384                    dict_id,
385                )?;
386
387                // values
388                self.encode_dictionaries(
389                    values,
390                    map_array.values(),
391                    encoded_dictionaries,
392                    dictionary_tracker,
393                    write_options,
394                    dict_id,
395                )?;
396            }
397            DataType::Union(fields, _) => {
398                let union = as_union_array(column);
399                for (type_id, field) in fields.iter() {
400                    let column = union.child(type_id);
401                    self.encode_dictionaries(
402                        field,
403                        column,
404                        encoded_dictionaries,
405                        dictionary_tracker,
406                        write_options,
407                        dict_id,
408                    )?;
409                }
410            }
411            _ => (),
412        }
413
414        Ok(())
415    }
416
417    fn encode_dictionaries<I: Iterator<Item = i64>>(
418        &self,
419        field: &Field,
420        column: &ArrayRef,
421        encoded_dictionaries: &mut Vec<EncodedData>,
422        dictionary_tracker: &mut DictionaryTracker,
423        write_options: &IpcWriteOptions,
424        dict_id_seq: &mut I,
425    ) -> Result<(), ArrowError> {
426        match column.data_type() {
427            DataType::Dictionary(_key_type, _value_type) => {
428                let dict_data = column.to_data();
429                let dict_values = &dict_data.child_data()[0];
430
431                let values = make_array(dict_data.child_data()[0].clone());
432
433                self._encode_dictionaries(
434                    &values,
435                    encoded_dictionaries,
436                    dictionary_tracker,
437                    write_options,
438                    dict_id_seq,
439                )?;
440
441                // It's importnat to only take the dict_id at this point, because the dict ID
442                // sequence is assigned depth-first, so we need to first encode children and have
443                // them take their assigned dict IDs before we take the dict ID for this field.
444                #[allow(deprecated)]
445                let dict_id = dict_id_seq
446                    .next()
447                    .or_else(|| field.dict_id())
448                    .ok_or_else(|| {
449                        ArrowError::IpcError(format!("no dict id for field {}", field.name()))
450                    })?;
451
452                let emit = dictionary_tracker.insert(dict_id, column)?;
453
454                if emit {
455                    encoded_dictionaries.push(self.dictionary_batch_to_bytes(
456                        dict_id,
457                        dict_values,
458                        write_options,
459                    )?);
460                }
461            }
462            _ => self._encode_dictionaries(
463                column,
464                encoded_dictionaries,
465                dictionary_tracker,
466                write_options,
467                dict_id_seq,
468            )?,
469        }
470
471        Ok(())
472    }
473
474    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
475    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
476    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
477    pub fn encoded_batch(
478        &self,
479        batch: &RecordBatch,
480        dictionary_tracker: &mut DictionaryTracker,
481        write_options: &IpcWriteOptions,
482    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
483        let schema = batch.schema();
484        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
485
486        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
487
488        for (i, field) in schema.fields().iter().enumerate() {
489            let column = batch.column(i);
490            self.encode_dictionaries(
491                field,
492                column,
493                &mut encoded_dictionaries,
494                dictionary_tracker,
495                write_options,
496                &mut dict_id,
497            )?;
498        }
499
500        let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
501        Ok((encoded_dictionaries, encoded_message))
502    }
503
504    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
505    /// other for the batch's data
506    fn record_batch_to_bytes(
507        &self,
508        batch: &RecordBatch,
509        write_options: &IpcWriteOptions,
510    ) -> Result<EncodedData, ArrowError> {
511        let mut fbb = FlatBufferBuilder::new();
512
513        let mut nodes: Vec<crate::FieldNode> = vec![];
514        let mut buffers: Vec<crate::Buffer> = vec![];
515        let mut arrow_data: Vec<u8> = vec![];
516        let mut offset = 0;
517
518        // get the type of compression
519        let batch_compression_type = write_options.batch_compression_type;
520
521        let compression = batch_compression_type.map(|batch_compression_type| {
522            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
523            c.add_method(crate::BodyCompressionMethod::BUFFER);
524            c.add_codec(batch_compression_type);
525            c.finish()
526        });
527
528        let compression_codec: Option<CompressionCodec> =
529            batch_compression_type.map(TryInto::try_into).transpose()?;
530
531        let mut variadic_buffer_counts = vec![];
532
533        for array in batch.columns() {
534            let array_data = array.to_data();
535            offset = write_array_data(
536                &array_data,
537                &mut buffers,
538                &mut arrow_data,
539                &mut nodes,
540                offset,
541                array.len(),
542                array.null_count(),
543                compression_codec,
544                write_options,
545            )?;
546
547            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
548        }
549        // pad the tail of body data
550        let len = arrow_data.len();
551        let pad_len = pad_to_alignment(write_options.alignment, len);
552        arrow_data.extend_from_slice(&PADDING[..pad_len]);
553
554        // write data
555        let buffers = fbb.create_vector(&buffers);
556        let nodes = fbb.create_vector(&nodes);
557        let variadic_buffer = if variadic_buffer_counts.is_empty() {
558            None
559        } else {
560            Some(fbb.create_vector(&variadic_buffer_counts))
561        };
562
563        let root = {
564            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
565            batch_builder.add_length(batch.num_rows() as i64);
566            batch_builder.add_nodes(nodes);
567            batch_builder.add_buffers(buffers);
568            if let Some(c) = compression {
569                batch_builder.add_compression(c);
570            }
571
572            if let Some(v) = variadic_buffer {
573                batch_builder.add_variadicBufferCounts(v);
574            }
575            let b = batch_builder.finish();
576            b.as_union_value()
577        };
578        // create an crate::Message
579        let mut message = crate::MessageBuilder::new(&mut fbb);
580        message.add_version(write_options.metadata_version);
581        message.add_header_type(crate::MessageHeader::RecordBatch);
582        message.add_bodyLength(arrow_data.len() as i64);
583        message.add_header(root);
584        let root = message.finish();
585        fbb.finish(root, None);
586        let finished_data = fbb.finished_data();
587
588        Ok(EncodedData {
589            ipc_message: finished_data.to_vec(),
590            arrow_data,
591        })
592    }
593
594    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
595    /// other for the data
596    fn dictionary_batch_to_bytes(
597        &self,
598        dict_id: i64,
599        array_data: &ArrayData,
600        write_options: &IpcWriteOptions,
601    ) -> Result<EncodedData, ArrowError> {
602        let mut fbb = FlatBufferBuilder::new();
603
604        let mut nodes: Vec<crate::FieldNode> = vec![];
605        let mut buffers: Vec<crate::Buffer> = vec![];
606        let mut arrow_data: Vec<u8> = vec![];
607
608        // get the type of compression
609        let batch_compression_type = write_options.batch_compression_type;
610
611        let compression = batch_compression_type.map(|batch_compression_type| {
612            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
613            c.add_method(crate::BodyCompressionMethod::BUFFER);
614            c.add_codec(batch_compression_type);
615            c.finish()
616        });
617
618        let compression_codec: Option<CompressionCodec> = batch_compression_type
619            .map(|batch_compression_type| batch_compression_type.try_into())
620            .transpose()?;
621
622        write_array_data(
623            array_data,
624            &mut buffers,
625            &mut arrow_data,
626            &mut nodes,
627            0,
628            array_data.len(),
629            array_data.null_count(),
630            compression_codec,
631            write_options,
632        )?;
633
634        let mut variadic_buffer_counts = vec![];
635        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
636
637        // pad the tail of body data
638        let len = arrow_data.len();
639        let pad_len = pad_to_alignment(write_options.alignment, len);
640        arrow_data.extend_from_slice(&PADDING[..pad_len]);
641
642        // write data
643        let buffers = fbb.create_vector(&buffers);
644        let nodes = fbb.create_vector(&nodes);
645        let variadic_buffer = if variadic_buffer_counts.is_empty() {
646            None
647        } else {
648            Some(fbb.create_vector(&variadic_buffer_counts))
649        };
650
651        let root = {
652            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
653            batch_builder.add_length(array_data.len() as i64);
654            batch_builder.add_nodes(nodes);
655            batch_builder.add_buffers(buffers);
656            if let Some(c) = compression {
657                batch_builder.add_compression(c);
658            }
659            if let Some(v) = variadic_buffer {
660                batch_builder.add_variadicBufferCounts(v);
661            }
662            batch_builder.finish()
663        };
664
665        let root = {
666            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
667            batch_builder.add_id(dict_id);
668            batch_builder.add_data(root);
669            batch_builder.finish().as_union_value()
670        };
671
672        let root = {
673            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
674            message_builder.add_version(write_options.metadata_version);
675            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
676            message_builder.add_bodyLength(arrow_data.len() as i64);
677            message_builder.add_header(root);
678            message_builder.finish()
679        };
680
681        fbb.finish(root, None);
682        let finished_data = fbb.finished_data();
683
684        Ok(EncodedData {
685            ipc_message: finished_data.to_vec(),
686            arrow_data,
687        })
688    }
689}
690
691fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
692    match array.data_type() {
693        DataType::BinaryView | DataType::Utf8View => {
694            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
695            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
696            counts.push(array.buffers().len() as i64 - 1);
697        }
698        DataType::Dictionary(_, _) => {
699            // Do nothing
700            // Dictionary types are handled in `encode_dictionaries`.
701        }
702        _ => {
703            for child in array.child_data() {
704                append_variadic_buffer_counts(counts, child)
705            }
706        }
707    }
708}
709
710pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
711    match arr.data_type() {
712        DataType::RunEndEncoded(k, _) => match k.data_type() {
713            DataType::Int16 => {
714                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
715            }
716            DataType::Int32 => {
717                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
718            }
719            DataType::Int64 => {
720                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
721            }
722            d => unreachable!("Unexpected data type {d}"),
723        },
724        d => Err(ArrowError::InvalidArgumentError(format!(
725            "The given array is not a run array. Data type of given array: {d}"
726        ))),
727    }
728}
729
730// Returns a `RunArray` with zero offset and length matching the last value
731// in run_ends array.
732fn into_zero_offset_run_array<R: RunEndIndexType>(
733    run_array: RunArray<R>,
734) -> Result<RunArray<R>, ArrowError> {
735    let run_ends = run_array.run_ends();
736    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
737        return Ok(run_array);
738    }
739
740    // The physical index of original run_ends array from which the `ArrayData`is sliced.
741    let start_physical_index = run_ends.get_start_physical_index();
742
743    // The physical index of original run_ends array until which the `ArrayData`is sliced.
744    let end_physical_index = run_ends.get_end_physical_index();
745
746    let physical_length = end_physical_index - start_physical_index + 1;
747
748    // build new run_ends array by subtracting offset from run ends.
749    let offset = R::Native::usize_as(run_ends.offset());
750    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
751    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
752        builder.append(run_end_value.sub_wrapping(offset));
753    }
754    builder.append(R::Native::from_usize(run_array.len()).unwrap());
755    let new_run_ends = unsafe {
756        // Safety:
757        // The function builds a valid run_ends array and hence need not be validated.
758        ArrayDataBuilder::new(R::DATA_TYPE)
759            .len(physical_length)
760            .add_buffer(builder.finish())
761            .build_unchecked()
762    };
763
764    // build new values by slicing physical indices.
765    let new_values = run_array
766        .values()
767        .slice(start_physical_index, physical_length)
768        .into_data();
769
770    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
771        .len(run_array.len())
772        .add_child_data(new_run_ends)
773        .add_child_data(new_values);
774    let array_data = unsafe {
775        // Safety:
776        //  This function builds a valid run array and hence can skip validation.
777        builder.build_unchecked()
778    };
779    Ok(array_data.into())
780}
781
782/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
783/// multiple times.
784///
785/// Can optionally error if an update to an existing dictionary is attempted, which
786/// isn't allowed in the `FileWriter`.
787#[derive(Debug)]
788pub struct DictionaryTracker {
789    written: HashMap<i64, ArrayData>,
790    dict_ids: Vec<i64>,
791    error_on_replacement: bool,
792    #[deprecated(
793        since = "54.0.0",
794        note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
795    )]
796    preserve_dict_id: bool,
797}
798
799impl DictionaryTracker {
800    /// Create a new [`DictionaryTracker`].
801    ///
802    /// If `error_on_replacement`
803    /// is true, an error will be generated if an update to an
804    /// existing dictionary is attempted.
805    ///
806    /// If `preserve_dict_id` is true, the dictionary ID defined in the schema
807    /// is used, otherwise a unique dictionary ID will be assigned by incrementing
808    /// the last seen dictionary ID (or using `0` if no other dictionary IDs have been
809    /// seen)
810    pub fn new(error_on_replacement: bool) -> Self {
811        #[allow(deprecated)]
812        Self {
813            written: HashMap::new(),
814            dict_ids: Vec::new(),
815            error_on_replacement,
816            preserve_dict_id: false,
817        }
818    }
819
820    /// Create a new [`DictionaryTracker`].
821    ///
822    /// If `error_on_replacement`
823    /// is true, an error will be generated if an update to an
824    /// existing dictionary is attempted.
825    #[deprecated(
826        since = "54.0.0",
827        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
828    )]
829    pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self {
830        #[allow(deprecated)]
831        Self {
832            written: HashMap::new(),
833            dict_ids: Vec::new(),
834            error_on_replacement,
835            preserve_dict_id,
836        }
837    }
838
839    /// Set the dictionary ID for `field`.
840    ///
841    /// If `preserve_dict_id` is true, this will return the `dict_id` in `field` (or panic if `field` does
842    /// not have a `dict_id` defined).
843    ///
844    /// If `preserve_dict_id` is false, this will return the value of the last `dict_id` assigned incremented by 1
845    /// or 0 in the case where no dictionary IDs have yet been assigned
846    #[deprecated(
847        since = "54.0.0",
848        note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
849    )]
850    pub fn set_dict_id(&mut self, field: &Field) -> i64 {
851        #[allow(deprecated)]
852        let next = if self.preserve_dict_id {
853            #[allow(deprecated)]
854            field.dict_id().expect("no dict_id in field")
855        } else {
856            self.dict_ids
857                .last()
858                .copied()
859                .map(|i| i + 1)
860                .unwrap_or_default()
861        };
862
863        self.dict_ids.push(next);
864        next
865    }
866
867    /// Return the sequence of dictionary IDs in the order they should be observed while
868    /// traversing the schema
869    pub fn dict_id(&mut self) -> &[i64] {
870        &self.dict_ids
871    }
872
873    /// Keep track of the dictionary with the given ID and values. Behavior:
874    ///
875    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
876    ///   that the dictionary was not actually inserted (because it's already been seen).
877    /// * If this ID has been written already but with different data, and this tracker is
878    ///   configured to return an error, return an error.
879    /// * If the tracker has not been configured to error on replacement or this dictionary
880    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
881    ///   inserted.
882    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
883        let dict_data = column.to_data();
884        let dict_values = &dict_data.child_data()[0];
885
886        // If a dictionary with this id was already emitted, check if it was the same.
887        if let Some(last) = self.written.get(&dict_id) {
888            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
889                // Same dictionary values => no need to emit it again
890                return Ok(false);
891            }
892            if self.error_on_replacement {
893                // If error on replacement perform a logical comparison
894                if last.child_data()[0] == *dict_values {
895                    // Same dictionary values => no need to emit it again
896                    return Ok(false);
897                }
898                return Err(ArrowError::InvalidArgumentError(
899                    "Dictionary replacement detected when writing IPC file format. \
900                     Arrow IPC files only support a single dictionary for a given field \
901                     across all batches."
902                        .to_string(),
903                ));
904            }
905        }
906
907        self.written.insert(dict_id, dict_data);
908        Ok(true)
909    }
910}
911
912/// Arrow File Writer
913///
914/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
915///
916/// # See Also
917///
918/// * [`StreamWriter`] for writing IPC Streams
919///
920/// # Example
921/// ```
922/// # use arrow_array::record_batch;
923/// # use arrow_ipc::writer::FileWriter;
924/// # let mut file = vec![]; // mimic a file for the example
925/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
926/// // create a new writer, the schema must be known in advance
927/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
928/// // write each batch to the underlying writer
929/// writer.write(&batch).unwrap();
930/// // When all batches are written, call finish to flush all buffers
931/// writer.finish().unwrap();
932/// ```
933/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
934pub struct FileWriter<W> {
935    /// The object to write to
936    writer: W,
937    /// IPC write options
938    write_options: IpcWriteOptions,
939    /// A reference to the schema, used in validating record batches
940    schema: SchemaRef,
941    /// The number of bytes between each block of bytes, as an offset for random access
942    block_offsets: usize,
943    /// Dictionary blocks that will be written as part of the IPC footer
944    dictionary_blocks: Vec<crate::Block>,
945    /// Record blocks that will be written as part of the IPC footer
946    record_blocks: Vec<crate::Block>,
947    /// Whether the writer footer has been written, and the writer is finished
948    finished: bool,
949    /// Keeps track of dictionaries that have been written
950    dictionary_tracker: DictionaryTracker,
951    /// User level customized metadata
952    custom_metadata: HashMap<String, String>,
953
954    data_gen: IpcDataGenerator,
955}
956
957impl<W: Write> FileWriter<BufWriter<W>> {
958    /// Try to create a new file writer with the writer wrapped in a BufWriter.
959    ///
960    /// See [`FileWriter::try_new`] for an unbuffered version.
961    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
962        Self::try_new(BufWriter::new(writer), schema)
963    }
964}
965
966impl<W: Write> FileWriter<W> {
967    /// Try to create a new writer, with the schema written as part of the header
968    ///
969    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
970    ///
971    /// # Errors
972    ///
973    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
974    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
975        let write_options = IpcWriteOptions::default();
976        Self::try_new_with_options(writer, schema, write_options)
977    }
978
979    /// Try to create a new writer with IpcWriteOptions
980    ///
981    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
982    ///
983    /// # Errors
984    ///
985    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
986    pub fn try_new_with_options(
987        mut writer: W,
988        schema: &Schema,
989        write_options: IpcWriteOptions,
990    ) -> Result<Self, ArrowError> {
991        let data_gen = IpcDataGenerator::default();
992        // write magic to header aligned on alignment boundary
993        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
994        let header_size = super::ARROW_MAGIC.len() + pad_len;
995        writer.write_all(&super::ARROW_MAGIC)?;
996        writer.write_all(&PADDING[..pad_len])?;
997        // write the schema, set the written bytes to the schema + header
998        #[allow(deprecated)]
999        let preserve_dict_id = write_options.preserve_dict_id;
1000        #[allow(deprecated)]
1001        let mut dictionary_tracker =
1002            DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1003        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1004            schema,
1005            &mut dictionary_tracker,
1006            &write_options,
1007        );
1008        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1009        Ok(Self {
1010            writer,
1011            write_options,
1012            schema: Arc::new(schema.clone()),
1013            block_offsets: meta + data + header_size,
1014            dictionary_blocks: vec![],
1015            record_blocks: vec![],
1016            finished: false,
1017            dictionary_tracker,
1018            custom_metadata: HashMap::new(),
1019            data_gen,
1020        })
1021    }
1022
1023    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1024    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1025        self.custom_metadata.insert(key.into(), value.into());
1026    }
1027
1028    /// Write a record batch to the file
1029    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1030        if self.finished {
1031            return Err(ArrowError::IpcError(
1032                "Cannot write record batch to file writer as it is closed".to_string(),
1033            ));
1034        }
1035
1036        let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
1037            batch,
1038            &mut self.dictionary_tracker,
1039            &self.write_options,
1040        )?;
1041
1042        for encoded_dictionary in encoded_dictionaries {
1043            let (meta, data) =
1044                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1045
1046            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1047            self.dictionary_blocks.push(block);
1048            self.block_offsets += meta + data;
1049        }
1050
1051        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1052        // add a record block for the footer
1053        let block = crate::Block::new(
1054            self.block_offsets as i64,
1055            meta as i32, // TODO: is this still applicable?
1056            data as i64,
1057        );
1058        self.record_blocks.push(block);
1059        self.block_offsets += meta + data;
1060        Ok(())
1061    }
1062
1063    /// Write footer and closing tag, then mark the writer as done
1064    pub fn finish(&mut self) -> Result<(), ArrowError> {
1065        if self.finished {
1066            return Err(ArrowError::IpcError(
1067                "Cannot write footer to file writer as it is closed".to_string(),
1068            ));
1069        }
1070
1071        // write EOS
1072        write_continuation(&mut self.writer, &self.write_options, 0)?;
1073
1074        let mut fbb = FlatBufferBuilder::new();
1075        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1076        let record_batches = fbb.create_vector(&self.record_blocks);
1077        #[allow(deprecated)]
1078        let preserve_dict_id = self.write_options.preserve_dict_id;
1079        #[allow(deprecated)]
1080        let mut dictionary_tracker =
1081            DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1082        let schema = IpcSchemaEncoder::new()
1083            .with_dictionary_tracker(&mut dictionary_tracker)
1084            .schema_to_fb_offset(&mut fbb, &self.schema);
1085        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1086            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1087
1088        let root = {
1089            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1090            footer_builder.add_version(self.write_options.metadata_version);
1091            footer_builder.add_schema(schema);
1092            footer_builder.add_dictionaries(dictionaries);
1093            footer_builder.add_recordBatches(record_batches);
1094            if let Some(fb_custom_metadata) = fb_custom_metadata {
1095                footer_builder.add_custom_metadata(fb_custom_metadata);
1096            }
1097            footer_builder.finish()
1098        };
1099        fbb.finish(root, None);
1100        let footer_data = fbb.finished_data();
1101        self.writer.write_all(footer_data)?;
1102        self.writer
1103            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1104        self.writer.write_all(&super::ARROW_MAGIC)?;
1105        self.writer.flush()?;
1106        self.finished = true;
1107
1108        Ok(())
1109    }
1110
1111    /// Returns the arrow [`SchemaRef`] for this arrow file.
1112    pub fn schema(&self) -> &SchemaRef {
1113        &self.schema
1114    }
1115
1116    /// Gets a reference to the underlying writer.
1117    pub fn get_ref(&self) -> &W {
1118        &self.writer
1119    }
1120
1121    /// Gets a mutable reference to the underlying writer.
1122    ///
1123    /// It is inadvisable to directly write to the underlying writer.
1124    pub fn get_mut(&mut self) -> &mut W {
1125        &mut self.writer
1126    }
1127
1128    /// Flush the underlying writer.
1129    ///
1130    /// Both the BufWriter and the underlying writer are flushed.
1131    pub fn flush(&mut self) -> Result<(), ArrowError> {
1132        self.writer.flush()?;
1133        Ok(())
1134    }
1135
1136    /// Unwraps the underlying writer.
1137    ///
1138    /// The writer is flushed and the FileWriter is finished before returning.
1139    ///
1140    /// # Errors
1141    ///
1142    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1143    /// or while flushing the writer.
1144    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1145        if !self.finished {
1146            // `finish` flushes the writer.
1147            self.finish()?;
1148        }
1149        Ok(self.writer)
1150    }
1151}
1152
1153impl<W: Write> RecordBatchWriter for FileWriter<W> {
1154    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1155        self.write(batch)
1156    }
1157
1158    fn close(mut self) -> Result<(), ArrowError> {
1159        self.finish()
1160    }
1161}
1162
1163/// Arrow Stream Writer
1164///
1165/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1166///
1167/// # See Also
1168///
1169/// * [`FileWriter`] for writing IPC Files
1170///
1171/// # Example
1172/// ```
1173/// # use arrow_array::record_batch;
1174/// # use arrow_ipc::writer::StreamWriter;
1175/// # let mut stream = vec![]; // mimic a stream for the example
1176/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1177/// // create a new writer, the schema must be known in advance
1178/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1179/// // write each batch to the underlying stream
1180/// writer.write(&batch).unwrap();
1181/// // When all batches are written, call finish to flush all buffers
1182/// writer.finish().unwrap();
1183/// ```
1184///
1185/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1186pub struct StreamWriter<W> {
1187    /// The object to write to
1188    writer: W,
1189    /// IPC write options
1190    write_options: IpcWriteOptions,
1191    /// Whether the writer footer has been written, and the writer is finished
1192    finished: bool,
1193    /// Keeps track of dictionaries that have been written
1194    dictionary_tracker: DictionaryTracker,
1195
1196    data_gen: IpcDataGenerator,
1197}
1198
1199impl<W: Write> StreamWriter<BufWriter<W>> {
1200    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1201    ///
1202    /// See [`StreamWriter::try_new`] for an unbuffered version.
1203    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1204        Self::try_new(BufWriter::new(writer), schema)
1205    }
1206}
1207
1208impl<W: Write> StreamWriter<W> {
1209    /// Try to create a new writer, with the schema written as part of the header.
1210    ///
1211    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1212    ///
1213    /// # Errors
1214    ///
1215    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1216    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1217        let write_options = IpcWriteOptions::default();
1218        Self::try_new_with_options(writer, schema, write_options)
1219    }
1220
1221    /// Try to create a new writer with [`IpcWriteOptions`].
1222    ///
1223    /// # Errors
1224    ///
1225    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1226    pub fn try_new_with_options(
1227        mut writer: W,
1228        schema: &Schema,
1229        write_options: IpcWriteOptions,
1230    ) -> Result<Self, ArrowError> {
1231        let data_gen = IpcDataGenerator::default();
1232        #[allow(deprecated)]
1233        let preserve_dict_id = write_options.preserve_dict_id;
1234        #[allow(deprecated)]
1235        let mut dictionary_tracker =
1236            DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id);
1237
1238        // write the schema, set the written bytes to the schema
1239        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1240            schema,
1241            &mut dictionary_tracker,
1242            &write_options,
1243        );
1244        write_message(&mut writer, encoded_message, &write_options)?;
1245        Ok(Self {
1246            writer,
1247            write_options,
1248            finished: false,
1249            dictionary_tracker,
1250            data_gen,
1251        })
1252    }
1253
1254    /// Write a record batch to the stream
1255    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1256        if self.finished {
1257            return Err(ArrowError::IpcError(
1258                "Cannot write record batch to stream writer as it is closed".to_string(),
1259            ));
1260        }
1261
1262        let (encoded_dictionaries, encoded_message) = self
1263            .data_gen
1264            .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1265            .expect("StreamWriter is configured to not error on dictionary replacement");
1266
1267        for encoded_dictionary in encoded_dictionaries {
1268            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1269        }
1270
1271        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1272        Ok(())
1273    }
1274
1275    /// Write continuation bytes, and mark the stream as done
1276    pub fn finish(&mut self) -> Result<(), ArrowError> {
1277        if self.finished {
1278            return Err(ArrowError::IpcError(
1279                "Cannot write footer to stream writer as it is closed".to_string(),
1280            ));
1281        }
1282
1283        write_continuation(&mut self.writer, &self.write_options, 0)?;
1284
1285        self.finished = true;
1286
1287        Ok(())
1288    }
1289
1290    /// Gets a reference to the underlying writer.
1291    pub fn get_ref(&self) -> &W {
1292        &self.writer
1293    }
1294
1295    /// Gets a mutable reference to the underlying writer.
1296    ///
1297    /// It is inadvisable to directly write to the underlying writer.
1298    pub fn get_mut(&mut self) -> &mut W {
1299        &mut self.writer
1300    }
1301
1302    /// Flush the underlying writer.
1303    ///
1304    /// Both the BufWriter and the underlying writer are flushed.
1305    pub fn flush(&mut self) -> Result<(), ArrowError> {
1306        self.writer.flush()?;
1307        Ok(())
1308    }
1309
1310    /// Unwraps the the underlying writer.
1311    ///
1312    /// The writer is flushed and the StreamWriter is finished before returning.
1313    ///
1314    /// # Errors
1315    ///
1316    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1317    /// or while flushing the writer.
1318    ///
1319    /// # Example
1320    ///
1321    /// ```
1322    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1323    /// # use arrow_ipc::MetadataVersion;
1324    /// # use arrow_schema::{ArrowError, Schema};
1325    /// # fn main() -> Result<(), ArrowError> {
1326    /// // The result we expect from an empty schema
1327    /// let expected = vec![
1328    ///     255, 255, 255, 255,  48,   0,   0,   0,
1329    ///      16,   0,   0,   0,   0,   0,  10,   0,
1330    ///      12,   0,  10,   0,   9,   0,   4,   0,
1331    ///      10,   0,   0,   0,  16,   0,   0,   0,
1332    ///       0,   1,   4,   0,   8,   0,   8,   0,
1333    ///       0,   0,   4,   0,   8,   0,   0,   0,
1334    ///       4,   0,   0,   0,   0,   0,   0,   0,
1335    ///     255, 255, 255, 255,   0,   0,   0,   0
1336    /// ];
1337    ///
1338    /// let schema = Schema::empty();
1339    /// let buffer: Vec<u8> = Vec::new();
1340    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1341    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1342    ///
1343    /// assert_eq!(stream_writer.into_inner()?, expected);
1344    /// # Ok(())
1345    /// # }
1346    /// ```
1347    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1348        if !self.finished {
1349            // `finish` flushes.
1350            self.finish()?;
1351        }
1352        Ok(self.writer)
1353    }
1354}
1355
1356impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1357    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1358        self.write(batch)
1359    }
1360
1361    fn close(mut self) -> Result<(), ArrowError> {
1362        self.finish()
1363    }
1364}
1365
1366/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1367pub struct EncodedData {
1368    /// An encoded crate::Message
1369    pub ipc_message: Vec<u8>,
1370    /// Arrow buffers to be written, should be an empty vec for schema messages
1371    pub arrow_data: Vec<u8>,
1372}
1373/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1374pub fn write_message<W: Write>(
1375    mut writer: W,
1376    encoded: EncodedData,
1377    write_options: &IpcWriteOptions,
1378) -> Result<(usize, usize), ArrowError> {
1379    let arrow_data_len = encoded.arrow_data.len();
1380    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1381        return Err(ArrowError::MemoryError(
1382            "Arrow data not aligned".to_string(),
1383        ));
1384    }
1385
1386    let a = usize::from(write_options.alignment - 1);
1387    let buffer = encoded.ipc_message;
1388    let flatbuf_size = buffer.len();
1389    let prefix_size = if write_options.write_legacy_ipc_format {
1390        4
1391    } else {
1392        8
1393    };
1394    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1395    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1396
1397    write_continuation(
1398        &mut writer,
1399        write_options,
1400        (aligned_size - prefix_size) as i32,
1401    )?;
1402
1403    // write the flatbuf
1404    if flatbuf_size > 0 {
1405        writer.write_all(&buffer)?;
1406    }
1407    // write padding
1408    writer.write_all(&PADDING[..padding_bytes])?;
1409
1410    // write arrow data
1411    let body_len = if arrow_data_len > 0 {
1412        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1413    } else {
1414        0
1415    };
1416
1417    Ok((aligned_size, body_len))
1418}
1419
1420fn write_body_buffers<W: Write>(
1421    mut writer: W,
1422    data: &[u8],
1423    alignment: u8,
1424) -> Result<usize, ArrowError> {
1425    let len = data.len();
1426    let pad_len = pad_to_alignment(alignment, len);
1427    let total_len = len + pad_len;
1428
1429    // write body buffer
1430    writer.write_all(data)?;
1431    if pad_len > 0 {
1432        writer.write_all(&PADDING[..pad_len])?;
1433    }
1434
1435    writer.flush()?;
1436    Ok(total_len)
1437}
1438
1439/// Write a record batch to the writer, writing the message size before the message
1440/// if the record batch is being written to a stream
1441fn write_continuation<W: Write>(
1442    mut writer: W,
1443    write_options: &IpcWriteOptions,
1444    total_len: i32,
1445) -> Result<usize, ArrowError> {
1446    let mut written = 8;
1447
1448    // the version of the writer determines whether continuation markers should be added
1449    match write_options.metadata_version {
1450        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1451            unreachable!("Options with the metadata version cannot be created")
1452        }
1453        crate::MetadataVersion::V4 => {
1454            if !write_options.write_legacy_ipc_format {
1455                // v0.15.0 format
1456                writer.write_all(&CONTINUATION_MARKER)?;
1457                written = 4;
1458            }
1459            writer.write_all(&total_len.to_le_bytes()[..])?;
1460        }
1461        crate::MetadataVersion::V5 => {
1462            // write continuation marker and message length
1463            writer.write_all(&CONTINUATION_MARKER)?;
1464            writer.write_all(&total_len.to_le_bytes()[..])?;
1465        }
1466        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1467    };
1468
1469    writer.flush()?;
1470
1471    Ok(written)
1472}
1473
1474/// In V4, null types have no validity bitmap
1475/// In V5 and later, null and union types have no validity bitmap
1476/// Run end encoded type has no validity bitmap.
1477fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1478    if write_options.metadata_version < crate::MetadataVersion::V5 {
1479        !matches!(data_type, DataType::Null)
1480    } else {
1481        !matches!(
1482            data_type,
1483            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1484        )
1485    }
1486}
1487
1488/// Whether to truncate the buffer
1489#[inline]
1490fn buffer_need_truncate(
1491    array_offset: usize,
1492    buffer: &Buffer,
1493    spec: &BufferSpec,
1494    min_length: usize,
1495) -> bool {
1496    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1497}
1498
1499/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1500#[inline]
1501fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1502    match spec {
1503        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1504        _ => 0,
1505    }
1506}
1507
1508/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1509/// original start offset and length for use in slicing child data.
1510fn reencode_offsets<O: OffsetSizeTrait>(
1511    offsets: &Buffer,
1512    data: &ArrayData,
1513) -> (Buffer, usize, usize) {
1514    let offsets_slice: &[O] = offsets.typed_data::<O>();
1515    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1516
1517    let start_offset = offset_slice.first().unwrap();
1518    let end_offset = offset_slice.last().unwrap();
1519
1520    let offsets = match start_offset.as_usize() {
1521        0 => {
1522            let size = size_of::<O>();
1523            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1524        }
1525        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1526    };
1527
1528    let start_offset = start_offset.as_usize();
1529    let end_offset = end_offset.as_usize();
1530
1531    (offsets, start_offset, end_offset - start_offset)
1532}
1533
1534/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1535///
1536/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1537/// slicing the values buffer as appropriate. This helps reduce the encoded
1538/// size of sliced arrays, as values that have been sliced away are not encoded
1539fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1540    if data.is_empty() {
1541        return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1542    }
1543
1544    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1545    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1546    (offsets, values)
1547}
1548
1549/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1550/// of a values buffer.
1551fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1552    if data.is_empty() {
1553        return (
1554            MutableBuffer::new(0).into(),
1555            data.child_data()[0].slice(0, 0),
1556        );
1557    }
1558
1559    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1560    let child_data = data.child_data()[0].slice(original_start_offset, len);
1561    (offsets, child_data)
1562}
1563
1564/// Write array data to a vector of bytes
1565#[allow(clippy::too_many_arguments)]
1566fn write_array_data(
1567    array_data: &ArrayData,
1568    buffers: &mut Vec<crate::Buffer>,
1569    arrow_data: &mut Vec<u8>,
1570    nodes: &mut Vec<crate::FieldNode>,
1571    offset: i64,
1572    num_rows: usize,
1573    null_count: usize,
1574    compression_codec: Option<CompressionCodec>,
1575    write_options: &IpcWriteOptions,
1576) -> Result<i64, ArrowError> {
1577    let mut offset = offset;
1578    if !matches!(array_data.data_type(), DataType::Null) {
1579        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1580    } else {
1581        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1582        // where null_count is always 0.
1583        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1584    }
1585    if has_validity_bitmap(array_data.data_type(), write_options) {
1586        // write null buffer if exists
1587        let null_buffer = match array_data.nulls() {
1588            None => {
1589                // create a buffer and fill it with valid bits
1590                let num_bytes = bit_util::ceil(num_rows, 8);
1591                let buffer = MutableBuffer::new(num_bytes);
1592                let buffer = buffer.with_bitset(num_bytes, true);
1593                buffer.into()
1594            }
1595            Some(buffer) => buffer.inner().sliced(),
1596        };
1597
1598        offset = write_buffer(
1599            null_buffer.as_slice(),
1600            buffers,
1601            arrow_data,
1602            offset,
1603            compression_codec,
1604            write_options.alignment,
1605        )?;
1606    }
1607
1608    let data_type = array_data.data_type();
1609    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1610        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1611        for buffer in [offsets, values] {
1612            offset = write_buffer(
1613                buffer.as_slice(),
1614                buffers,
1615                arrow_data,
1616                offset,
1617                compression_codec,
1618                write_options.alignment,
1619            )?;
1620        }
1621    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1622        // Slicing the views buffer is safe and easy,
1623        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1624        //
1625        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1626        // If users wants to "compact" the arrays prior to sending them over IPC,
1627        // they should consider the gc API suggested in #5513
1628        for buffer in array_data.buffers() {
1629            offset = write_buffer(
1630                buffer.as_slice(),
1631                buffers,
1632                arrow_data,
1633                offset,
1634                compression_codec,
1635                write_options.alignment,
1636            )?;
1637        }
1638    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1639        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1640        for buffer in [offsets, values] {
1641            offset = write_buffer(
1642                buffer.as_slice(),
1643                buffers,
1644                arrow_data,
1645                offset,
1646                compression_codec,
1647                write_options.alignment,
1648            )?;
1649        }
1650    } else if DataType::is_numeric(data_type)
1651        || DataType::is_temporal(data_type)
1652        || matches!(
1653            array_data.data_type(),
1654            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1655        )
1656    {
1657        // Truncate values
1658        assert_eq!(array_data.buffers().len(), 1);
1659
1660        let buffer = &array_data.buffers()[0];
1661        let layout = layout(data_type);
1662        let spec = &layout.buffers[0];
1663
1664        let byte_width = get_buffer_element_width(spec);
1665        let min_length = array_data.len() * byte_width;
1666        let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1667            let byte_offset = array_data.offset() * byte_width;
1668            let buffer_length = min(min_length, buffer.len() - byte_offset);
1669            &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1670        } else {
1671            buffer.as_slice()
1672        };
1673        offset = write_buffer(
1674            buffer_slice,
1675            buffers,
1676            arrow_data,
1677            offset,
1678            compression_codec,
1679            write_options.alignment,
1680        )?;
1681    } else if matches!(data_type, DataType::Boolean) {
1682        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1683        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1684        assert_eq!(array_data.buffers().len(), 1);
1685
1686        let buffer = &array_data.buffers()[0];
1687        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1688        offset = write_buffer(
1689            &buffer,
1690            buffers,
1691            arrow_data,
1692            offset,
1693            compression_codec,
1694            write_options.alignment,
1695        )?;
1696    } else if matches!(
1697        data_type,
1698        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1699    ) {
1700        assert_eq!(array_data.buffers().len(), 1);
1701        assert_eq!(array_data.child_data().len(), 1);
1702
1703        // Truncate offsets and the child data to avoid writing unnecessary data
1704        let (offsets, sliced_child_data) = match data_type {
1705            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1706            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1707            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1708            _ => unreachable!(),
1709        };
1710        offset = write_buffer(
1711            offsets.as_slice(),
1712            buffers,
1713            arrow_data,
1714            offset,
1715            compression_codec,
1716            write_options.alignment,
1717        )?;
1718        offset = write_array_data(
1719            &sliced_child_data,
1720            buffers,
1721            arrow_data,
1722            nodes,
1723            offset,
1724            sliced_child_data.len(),
1725            sliced_child_data.null_count(),
1726            compression_codec,
1727            write_options,
1728        )?;
1729        return Ok(offset);
1730    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1731        assert_eq!(array_data.child_data().len(), 1);
1732        let fixed_size = *fixed_size as usize;
1733
1734        let child_offset = array_data.offset() * fixed_size;
1735        let child_length = array_data.len() * fixed_size;
1736        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1737
1738        offset = write_array_data(
1739            &child_data,
1740            buffers,
1741            arrow_data,
1742            nodes,
1743            offset,
1744            child_data.len(),
1745            child_data.null_count(),
1746            compression_codec,
1747            write_options,
1748        )?;
1749        return Ok(offset);
1750    } else {
1751        for buffer in array_data.buffers() {
1752            offset = write_buffer(
1753                buffer,
1754                buffers,
1755                arrow_data,
1756                offset,
1757                compression_codec,
1758                write_options.alignment,
1759            )?;
1760        }
1761    }
1762
1763    match array_data.data_type() {
1764        DataType::Dictionary(_, _) => {}
1765        DataType::RunEndEncoded(_, _) => {
1766            // unslice the run encoded array.
1767            let arr = unslice_run_array(array_data.clone())?;
1768            // recursively write out nested structures
1769            for data_ref in arr.child_data() {
1770                // write the nested data (e.g list data)
1771                offset = write_array_data(
1772                    data_ref,
1773                    buffers,
1774                    arrow_data,
1775                    nodes,
1776                    offset,
1777                    data_ref.len(),
1778                    data_ref.null_count(),
1779                    compression_codec,
1780                    write_options,
1781                )?;
1782            }
1783        }
1784        _ => {
1785            // recursively write out nested structures
1786            for data_ref in array_data.child_data() {
1787                // write the nested data (e.g list data)
1788                offset = write_array_data(
1789                    data_ref,
1790                    buffers,
1791                    arrow_data,
1792                    nodes,
1793                    offset,
1794                    data_ref.len(),
1795                    data_ref.null_count(),
1796                    compression_codec,
1797                    write_options,
1798                )?;
1799            }
1800        }
1801    }
1802    Ok(offset)
1803}
1804
1805/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
1806/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
1807///
1808///
1809/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
1810/// Each constituent buffer is first compressed with the indicated
1811/// compressor, and then written with the uncompressed length in the first 8
1812/// bytes as a 64-bit little-endian signed integer followed by the compressed
1813/// buffer bytes (and then padding as required by the protocol). The
1814/// uncompressed length may be set to -1 to indicate that the data that
1815/// follows is not compressed, which can be useful for cases where
1816/// compression does not yield appreciable savings.
1817fn write_buffer(
1818    buffer: &[u8],                    // input
1819    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
1820    arrow_data: &mut Vec<u8>,         // output stream
1821    offset: i64,                      // current output stream offset
1822    compression_codec: Option<CompressionCodec>,
1823    alignment: u8,
1824) -> Result<i64, ArrowError> {
1825    let len: i64 = match compression_codec {
1826        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1827        None => {
1828            arrow_data.extend_from_slice(buffer);
1829            buffer.len()
1830        }
1831    }
1832    .try_into()
1833    .map_err(|e| {
1834        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1835    })?;
1836
1837    // make new index entry
1838    buffers.push(crate::Buffer::new(offset, len));
1839    // padding and make offset aligned
1840    let pad_len = pad_to_alignment(alignment, len as usize);
1841    arrow_data.extend_from_slice(&PADDING[..pad_len]);
1842
1843    Ok(offset + len + (pad_len as i64))
1844}
1845
1846const PADDING: [u8; 64] = [0; 64];
1847
1848/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
1849#[inline]
1850fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1851    let a = usize::from(alignment - 1);
1852    ((len + a) & !a) - len
1853}
1854
1855#[cfg(test)]
1856mod tests {
1857    use std::hash::Hasher;
1858    use std::io::Cursor;
1859    use std::io::Seek;
1860
1861    use arrow_array::builder::FixedSizeListBuilder;
1862    use arrow_array::builder::Float32Builder;
1863    use arrow_array::builder::Int64Builder;
1864    use arrow_array::builder::MapBuilder;
1865    use arrow_array::builder::UnionBuilder;
1866    use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1867    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1868    use arrow_array::types::*;
1869    use arrow_buffer::ScalarBuffer;
1870
1871    use crate::convert::fb_to_schema;
1872    use crate::reader::*;
1873    use crate::root_as_footer;
1874    use crate::MetadataVersion;
1875
1876    use super::*;
1877
1878    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1879        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1880        writer.write(rb).unwrap();
1881        writer.finish().unwrap();
1882        writer.into_inner().unwrap()
1883    }
1884
1885    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1886        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1887        reader.next().unwrap().unwrap()
1888    }
1889
1890    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1891        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
1892        // without needing to construct a giant array to spill over the 64-byte default alignment
1893        // boundary.
1894        const IPC_ALIGNMENT: usize = 8;
1895
1896        let mut stream_writer = StreamWriter::try_new_with_options(
1897            vec![],
1898            record.schema_ref(),
1899            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1900        )
1901        .unwrap();
1902        stream_writer.write(record).unwrap();
1903        stream_writer.finish().unwrap();
1904        stream_writer.into_inner().unwrap()
1905    }
1906
1907    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1908        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1909        stream_reader.next().unwrap().unwrap()
1910    }
1911
1912    #[test]
1913    #[cfg(feature = "lz4")]
1914    fn test_write_empty_record_batch_lz4_compression() {
1915        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1916        let values: Vec<Option<i32>> = vec![];
1917        let array = Int32Array::from(values);
1918        let record_batch =
1919            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1920
1921        let mut file = tempfile::tempfile().unwrap();
1922
1923        {
1924            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1925                .unwrap()
1926                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1927                .unwrap();
1928
1929            let mut writer =
1930                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1931            writer.write(&record_batch).unwrap();
1932            writer.finish().unwrap();
1933        }
1934        file.rewind().unwrap();
1935        {
1936            // read file
1937            let reader = FileReader::try_new(file, None).unwrap();
1938            for read_batch in reader {
1939                read_batch
1940                    .unwrap()
1941                    .columns()
1942                    .iter()
1943                    .zip(record_batch.columns())
1944                    .for_each(|(a, b)| {
1945                        assert_eq!(a.data_type(), b.data_type());
1946                        assert_eq!(a.len(), b.len());
1947                        assert_eq!(a.null_count(), b.null_count());
1948                    });
1949            }
1950        }
1951    }
1952
1953    #[test]
1954    #[cfg(feature = "lz4")]
1955    fn test_write_file_with_lz4_compression() {
1956        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1957        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1958        let array = Int32Array::from(values);
1959        let record_batch =
1960            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1961
1962        let mut file = tempfile::tempfile().unwrap();
1963        {
1964            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1965                .unwrap()
1966                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1967                .unwrap();
1968
1969            let mut writer =
1970                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1971            writer.write(&record_batch).unwrap();
1972            writer.finish().unwrap();
1973        }
1974        file.rewind().unwrap();
1975        {
1976            // read file
1977            let reader = FileReader::try_new(file, None).unwrap();
1978            for read_batch in reader {
1979                read_batch
1980                    .unwrap()
1981                    .columns()
1982                    .iter()
1983                    .zip(record_batch.columns())
1984                    .for_each(|(a, b)| {
1985                        assert_eq!(a.data_type(), b.data_type());
1986                        assert_eq!(a.len(), b.len());
1987                        assert_eq!(a.null_count(), b.null_count());
1988                    });
1989            }
1990        }
1991    }
1992
1993    #[test]
1994    #[cfg(feature = "zstd")]
1995    fn test_write_file_with_zstd_compression() {
1996        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1997        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1998        let array = Int32Array::from(values);
1999        let record_batch =
2000            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2001        let mut file = tempfile::tempfile().unwrap();
2002        {
2003            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2004                .unwrap()
2005                .try_with_compression(Some(crate::CompressionType::ZSTD))
2006                .unwrap();
2007
2008            let mut writer =
2009                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2010            writer.write(&record_batch).unwrap();
2011            writer.finish().unwrap();
2012        }
2013        file.rewind().unwrap();
2014        {
2015            // read file
2016            let reader = FileReader::try_new(file, None).unwrap();
2017            for read_batch in reader {
2018                read_batch
2019                    .unwrap()
2020                    .columns()
2021                    .iter()
2022                    .zip(record_batch.columns())
2023                    .for_each(|(a, b)| {
2024                        assert_eq!(a.data_type(), b.data_type());
2025                        assert_eq!(a.len(), b.len());
2026                        assert_eq!(a.null_count(), b.null_count());
2027                    });
2028            }
2029        }
2030    }
2031
2032    #[test]
2033    fn test_write_file() {
2034        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2035        let values: Vec<Option<u32>> = vec![
2036            Some(999),
2037            None,
2038            Some(235),
2039            Some(123),
2040            None,
2041            None,
2042            None,
2043            None,
2044            None,
2045        ];
2046        let array1 = UInt32Array::from(values);
2047        let batch =
2048            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2049                .unwrap();
2050        let mut file = tempfile::tempfile().unwrap();
2051        {
2052            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2053
2054            writer.write(&batch).unwrap();
2055            writer.finish().unwrap();
2056        }
2057        file.rewind().unwrap();
2058
2059        {
2060            let mut reader = FileReader::try_new(file, None).unwrap();
2061            while let Some(Ok(read_batch)) = reader.next() {
2062                read_batch
2063                    .columns()
2064                    .iter()
2065                    .zip(batch.columns())
2066                    .for_each(|(a, b)| {
2067                        assert_eq!(a.data_type(), b.data_type());
2068                        assert_eq!(a.len(), b.len());
2069                        assert_eq!(a.null_count(), b.null_count());
2070                    });
2071            }
2072        }
2073    }
2074
2075    fn write_null_file(options: IpcWriteOptions) {
2076        let schema = Schema::new(vec![
2077            Field::new("nulls", DataType::Null, true),
2078            Field::new("int32s", DataType::Int32, false),
2079            Field::new("nulls2", DataType::Null, true),
2080            Field::new("f64s", DataType::Float64, false),
2081        ]);
2082        let array1 = NullArray::new(32);
2083        let array2 = Int32Array::from(vec![1; 32]);
2084        let array3 = NullArray::new(32);
2085        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2086        let batch = RecordBatch::try_new(
2087            Arc::new(schema.clone()),
2088            vec![
2089                Arc::new(array1) as ArrayRef,
2090                Arc::new(array2) as ArrayRef,
2091                Arc::new(array3) as ArrayRef,
2092                Arc::new(array4) as ArrayRef,
2093            ],
2094        )
2095        .unwrap();
2096        let mut file = tempfile::tempfile().unwrap();
2097        {
2098            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2099
2100            writer.write(&batch).unwrap();
2101            writer.finish().unwrap();
2102        }
2103
2104        file.rewind().unwrap();
2105
2106        {
2107            let reader = FileReader::try_new(file, None).unwrap();
2108            reader.for_each(|maybe_batch| {
2109                maybe_batch
2110                    .unwrap()
2111                    .columns()
2112                    .iter()
2113                    .zip(batch.columns())
2114                    .for_each(|(a, b)| {
2115                        assert_eq!(a.data_type(), b.data_type());
2116                        assert_eq!(a.len(), b.len());
2117                        assert_eq!(a.null_count(), b.null_count());
2118                    });
2119            });
2120        }
2121    }
2122    #[test]
2123    fn test_write_null_file_v4() {
2124        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2125        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2126        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2127        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2128    }
2129
2130    #[test]
2131    fn test_write_null_file_v5() {
2132        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2133        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2134    }
2135
2136    #[test]
2137    fn track_union_nested_dict() {
2138        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2139
2140        let array = Arc::new(inner) as ArrayRef;
2141
2142        // Dict field with id 2
2143        #[allow(deprecated)]
2144        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false);
2145        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2146
2147        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2148        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2149
2150        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2151
2152        let schema = Arc::new(Schema::new(vec![Field::new(
2153            "union",
2154            union.data_type().clone(),
2155            false,
2156        )]));
2157
2158        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2159
2160        let gen = IpcDataGenerator {};
2161        #[allow(deprecated)]
2162        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2163        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2164            .unwrap();
2165
2166        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2167        // so we expect the dict will be keyed to 0
2168        assert!(dict_tracker.written.contains_key(&2));
2169    }
2170
2171    #[test]
2172    fn track_struct_nested_dict() {
2173        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2174
2175        let array = Arc::new(inner) as ArrayRef;
2176
2177        // Dict field with id 2
2178        #[allow(deprecated)]
2179        let dctfield = Arc::new(Field::new_dict(
2180            "dict",
2181            array.data_type().clone(),
2182            false,
2183            2,
2184            false,
2185        ));
2186
2187        let s = StructArray::from(vec![(dctfield, array)]);
2188        let struct_array = Arc::new(s) as ArrayRef;
2189
2190        let schema = Arc::new(Schema::new(vec![Field::new(
2191            "struct",
2192            struct_array.data_type().clone(),
2193            false,
2194        )]));
2195
2196        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2197
2198        let gen = IpcDataGenerator {};
2199        #[allow(deprecated)]
2200        let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2201        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2202            .unwrap();
2203
2204        assert!(dict_tracker.written.contains_key(&2));
2205    }
2206
2207    fn write_union_file(options: IpcWriteOptions) {
2208        let schema = Schema::new(vec![Field::new_union(
2209            "union",
2210            vec![0, 1],
2211            vec![
2212                Field::new("a", DataType::Int32, false),
2213                Field::new("c", DataType::Float64, false),
2214            ],
2215            UnionMode::Sparse,
2216        )]);
2217        let mut builder = UnionBuilder::with_capacity_sparse(5);
2218        builder.append::<Int32Type>("a", 1).unwrap();
2219        builder.append_null::<Int32Type>("a").unwrap();
2220        builder.append::<Float64Type>("c", 3.0).unwrap();
2221        builder.append_null::<Float64Type>("c").unwrap();
2222        builder.append::<Int32Type>("a", 4).unwrap();
2223        let union = builder.build().unwrap();
2224
2225        let batch =
2226            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2227                .unwrap();
2228
2229        let mut file = tempfile::tempfile().unwrap();
2230        {
2231            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2232
2233            writer.write(&batch).unwrap();
2234            writer.finish().unwrap();
2235        }
2236        file.rewind().unwrap();
2237
2238        {
2239            let reader = FileReader::try_new(file, None).unwrap();
2240            reader.for_each(|maybe_batch| {
2241                maybe_batch
2242                    .unwrap()
2243                    .columns()
2244                    .iter()
2245                    .zip(batch.columns())
2246                    .for_each(|(a, b)| {
2247                        assert_eq!(a.data_type(), b.data_type());
2248                        assert_eq!(a.len(), b.len());
2249                        assert_eq!(a.null_count(), b.null_count());
2250                    });
2251            });
2252        }
2253    }
2254
2255    #[test]
2256    fn test_write_union_file_v4_v5() {
2257        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2258        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2259    }
2260
2261    #[test]
2262    fn test_write_view_types() {
2263        const LONG_TEST_STRING: &str =
2264            "This is a long string to make sure binary view array handles it";
2265        let schema = Schema::new(vec![
2266            Field::new("field1", DataType::BinaryView, true),
2267            Field::new("field2", DataType::Utf8View, true),
2268        ]);
2269        let values: Vec<Option<&[u8]>> = vec![
2270            Some(b"foo"),
2271            Some(b"bar"),
2272            Some(LONG_TEST_STRING.as_bytes()),
2273        ];
2274        let binary_array = BinaryViewArray::from_iter(values);
2275        let utf8_array =
2276            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2277        let record_batch = RecordBatch::try_new(
2278            Arc::new(schema.clone()),
2279            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2280        )
2281        .unwrap();
2282
2283        let mut file = tempfile::tempfile().unwrap();
2284        {
2285            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2286            writer.write(&record_batch).unwrap();
2287            writer.finish().unwrap();
2288        }
2289        file.rewind().unwrap();
2290        {
2291            let mut reader = FileReader::try_new(&file, None).unwrap();
2292            let read_batch = reader.next().unwrap().unwrap();
2293            read_batch
2294                .columns()
2295                .iter()
2296                .zip(record_batch.columns())
2297                .for_each(|(a, b)| {
2298                    assert_eq!(a, b);
2299                });
2300        }
2301        file.rewind().unwrap();
2302        {
2303            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2304            let read_batch = reader.next().unwrap().unwrap();
2305            assert_eq!(read_batch.num_columns(), 1);
2306            let read_array = read_batch.column(0);
2307            let write_array = record_batch.column(0);
2308            assert_eq!(read_array, write_array);
2309        }
2310    }
2311
2312    #[test]
2313    fn truncate_ipc_record_batch() {
2314        fn create_batch(rows: usize) -> RecordBatch {
2315            let schema = Schema::new(vec![
2316                Field::new("a", DataType::Int32, false),
2317                Field::new("b", DataType::Utf8, false),
2318            ]);
2319
2320            let a = Int32Array::from_iter_values(0..rows as i32);
2321            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2322
2323            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2324        }
2325
2326        let big_record_batch = create_batch(65536);
2327
2328        let length = 5;
2329        let small_record_batch = create_batch(length);
2330
2331        let offset = 2;
2332        let record_batch_slice = big_record_batch.slice(offset, length);
2333        assert!(
2334            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2335        );
2336        assert_eq!(
2337            serialize_stream(&small_record_batch).len(),
2338            serialize_stream(&record_batch_slice).len()
2339        );
2340
2341        assert_eq!(
2342            deserialize_stream(serialize_stream(&record_batch_slice)),
2343            record_batch_slice
2344        );
2345    }
2346
2347    #[test]
2348    fn truncate_ipc_record_batch_with_nulls() {
2349        fn create_batch() -> RecordBatch {
2350            let schema = Schema::new(vec![
2351                Field::new("a", DataType::Int32, true),
2352                Field::new("b", DataType::Utf8, true),
2353            ]);
2354
2355            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2356            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2357
2358            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2359        }
2360
2361        let record_batch = create_batch();
2362        let record_batch_slice = record_batch.slice(1, 2);
2363        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2364
2365        assert!(
2366            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2367        );
2368
2369        assert!(deserialized_batch.column(0).is_null(0));
2370        assert!(deserialized_batch.column(0).is_valid(1));
2371        assert!(deserialized_batch.column(1).is_valid(0));
2372        assert!(deserialized_batch.column(1).is_valid(1));
2373
2374        assert_eq!(record_batch_slice, deserialized_batch);
2375    }
2376
2377    #[test]
2378    fn truncate_ipc_dictionary_array() {
2379        fn create_batch() -> RecordBatch {
2380            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2381                .into_iter()
2382                .collect();
2383            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2384
2385            let array = DictionaryArray::new(keys, Arc::new(values));
2386
2387            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2388
2389            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2390        }
2391
2392        let record_batch = create_batch();
2393        let record_batch_slice = record_batch.slice(1, 2);
2394        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2395
2396        assert!(
2397            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2398        );
2399
2400        assert!(deserialized_batch.column(0).is_valid(0));
2401        assert!(deserialized_batch.column(0).is_null(1));
2402
2403        assert_eq!(record_batch_slice, deserialized_batch);
2404    }
2405
2406    #[test]
2407    fn truncate_ipc_struct_array() {
2408        fn create_batch() -> RecordBatch {
2409            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2410                .into_iter()
2411                .collect();
2412            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2413
2414            let struct_array = StructArray::from(vec![
2415                (
2416                    Arc::new(Field::new("s", DataType::Utf8, true)),
2417                    Arc::new(strings) as ArrayRef,
2418                ),
2419                (
2420                    Arc::new(Field::new("c", DataType::Int32, true)),
2421                    Arc::new(ints) as ArrayRef,
2422                ),
2423            ]);
2424
2425            let schema = Schema::new(vec![Field::new(
2426                "struct_array",
2427                struct_array.data_type().clone(),
2428                true,
2429            )]);
2430
2431            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2432        }
2433
2434        let record_batch = create_batch();
2435        let record_batch_slice = record_batch.slice(1, 2);
2436        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2437
2438        assert!(
2439            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2440        );
2441
2442        let structs = deserialized_batch
2443            .column(0)
2444            .as_any()
2445            .downcast_ref::<StructArray>()
2446            .unwrap();
2447
2448        assert!(structs.column(0).is_null(0));
2449        assert!(structs.column(0).is_valid(1));
2450        assert!(structs.column(1).is_valid(0));
2451        assert!(structs.column(1).is_null(1));
2452        assert_eq!(record_batch_slice, deserialized_batch);
2453    }
2454
2455    #[test]
2456    fn truncate_ipc_string_array_with_all_empty_string() {
2457        fn create_batch() -> RecordBatch {
2458            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2459            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2460            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2461        }
2462
2463        let record_batch = create_batch();
2464        let record_batch_slice = record_batch.slice(0, 1);
2465        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2466
2467        assert!(
2468            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2469        );
2470        assert_eq!(record_batch_slice, deserialized_batch);
2471    }
2472
2473    #[test]
2474    fn test_stream_writer_writes_array_slice() {
2475        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2476        assert_eq!(
2477            vec![Some(1), Some(2), Some(3)],
2478            array.iter().collect::<Vec<_>>()
2479        );
2480
2481        let sliced = array.slice(1, 2);
2482        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2483
2484        let batch = RecordBatch::try_new(
2485            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2486            vec![Arc::new(sliced)],
2487        )
2488        .expect("new batch");
2489
2490        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2491        writer.write(&batch).expect("write");
2492        let outbuf = writer.into_inner().expect("inner");
2493
2494        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2495        let read_batch = reader.next().unwrap().expect("read batch");
2496
2497        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2498        assert_eq!(
2499            vec![Some(2), Some(3)],
2500            read_array.iter().collect::<Vec<_>>()
2501        );
2502    }
2503
2504    #[test]
2505    fn test_large_slice_uint32() {
2506        ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2507            if i % 2 == 0 {
2508                Some(i)
2509            } else {
2510                None
2511            }
2512        }))));
2513    }
2514
2515    #[test]
2516    fn test_large_slice_string() {
2517        let strings: Vec<_> = (0..8000)
2518            .map(|i| {
2519                if i % 2 == 0 {
2520                    Some(format!("value{}", i))
2521                } else {
2522                    None
2523                }
2524            })
2525            .collect();
2526
2527        ensure_roundtrip(Arc::new(StringArray::from(strings)));
2528    }
2529
2530    #[test]
2531    fn test_large_slice_string_list() {
2532        let mut ls = ListBuilder::new(StringBuilder::new());
2533
2534        let mut s = String::new();
2535        for row_number in 0..8000 {
2536            if row_number % 2 == 0 {
2537                for list_element in 0..1000 {
2538                    s.clear();
2539                    use std::fmt::Write;
2540                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2541                    ls.values().append_value(&s);
2542                }
2543                ls.append(true)
2544            } else {
2545                ls.append(false); // null
2546            }
2547        }
2548
2549        ensure_roundtrip(Arc::new(ls.finish()));
2550    }
2551
2552    #[test]
2553    fn test_large_slice_string_list_of_lists() {
2554        // The reason for the special test is to verify reencode_offsets which looks both at
2555        // the starting offset and the data offset.  So need a dataset where the starting_offset
2556        // is zero but the data offset is not.
2557        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2558
2559        for _ in 0..4000 {
2560            ls.values().append(true);
2561            ls.append(true)
2562        }
2563
2564        let mut s = String::new();
2565        for row_number in 0..4000 {
2566            if row_number % 2 == 0 {
2567                for list_element in 0..1000 {
2568                    s.clear();
2569                    use std::fmt::Write;
2570                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2571                    ls.values().values().append_value(&s);
2572                }
2573                ls.values().append(true);
2574                ls.append(true)
2575            } else {
2576                ls.append(false); // null
2577            }
2578        }
2579
2580        ensure_roundtrip(Arc::new(ls.finish()));
2581    }
2582
2583    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2584    fn ensure_roundtrip(array: ArrayRef) {
2585        let num_rows = array.len();
2586        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2587        // take off the first element
2588        let sliced_batch = orig_batch.slice(1, num_rows - 1);
2589
2590        let schema = orig_batch.schema();
2591        let stream_data = {
2592            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2593            writer.write(&sliced_batch).unwrap();
2594            writer.into_inner().unwrap()
2595        };
2596        let read_batch = {
2597            let projection = None;
2598            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2599            reader
2600                .next()
2601                .expect("expect no errors reading batch")
2602                .expect("expect batch")
2603        };
2604        assert_eq!(sliced_batch, read_batch);
2605
2606        let file_data = {
2607            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2608            writer.write(&sliced_batch).unwrap();
2609            writer.into_inner().unwrap().into_inner().unwrap()
2610        };
2611        let read_batch = {
2612            let projection = None;
2613            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2614            reader
2615                .next()
2616                .expect("expect no errors reading batch")
2617                .expect("expect batch")
2618        };
2619        assert_eq!(sliced_batch, read_batch);
2620
2621        // TODO test file writer/reader
2622    }
2623
2624    #[test]
2625    fn encode_bools_slice() {
2626        // Test case for https://github.com/apache/arrow-rs/issues/3496
2627        assert_bool_roundtrip([true, false], 1, 1);
2628
2629        // slice somewhere in the middle
2630        assert_bool_roundtrip(
2631            [
2632                true, false, true, true, false, false, true, true, true, false, false, false, true,
2633                true, true, true, false, false, false, false, true, true, true, true, true, false,
2634                false, false, false, false,
2635            ],
2636            13,
2637            17,
2638        );
2639
2640        // start at byte boundary, end in the middle
2641        assert_bool_roundtrip(
2642            [
2643                true, false, true, true, false, false, true, true, true, false, false, false,
2644            ],
2645            8,
2646            2,
2647        );
2648
2649        // start and stop and byte boundary
2650        assert_bool_roundtrip(
2651            [
2652                true, false, true, true, false, false, true, true, true, false, false, false, true,
2653                true, true, true, true, false, false, false, false, false,
2654            ],
2655            8,
2656            8,
2657        );
2658    }
2659
2660    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2661        let val_bool_field = Field::new("val", DataType::Boolean, false);
2662
2663        let schema = Arc::new(Schema::new(vec![val_bool_field]));
2664
2665        let bools = BooleanArray::from(bools.to_vec());
2666
2667        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2668        let batch = batch.slice(offset, length);
2669
2670        let data = serialize_stream(&batch);
2671        let batch2 = deserialize_stream(data);
2672        assert_eq!(batch, batch2);
2673    }
2674
2675    #[test]
2676    fn test_run_array_unslice() {
2677        let total_len = 80;
2678        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2679        let repeats: Vec<usize> = vec![3, 4, 1, 2];
2680        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2681        for ix in 0_usize..32 {
2682            let repeat: usize = repeats[ix % repeats.len()];
2683            let val: Option<i32> = vals[ix % vals.len()];
2684            input_array.resize(input_array.len() + repeat, val);
2685        }
2686
2687        // Encode the input_array to run array
2688        let mut builder =
2689            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2690        builder.extend(input_array.iter().copied());
2691        let run_array = builder.finish();
2692
2693        // test for all slice lengths.
2694        for slice_len in 1..=total_len {
2695            // test for offset = 0, slice length = slice_len
2696            let sliced_run_array: RunArray<Int16Type> =
2697                run_array.slice(0, slice_len).into_data().into();
2698
2699            // Create unsliced run array.
2700            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2701            let typed = unsliced_run_array
2702                .downcast::<PrimitiveArray<Int32Type>>()
2703                .unwrap();
2704            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2705            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2706            assert_eq!(expected, actual);
2707
2708            // test for offset = total_len - slice_len, length = slice_len
2709            let sliced_run_array: RunArray<Int16Type> = run_array
2710                .slice(total_len - slice_len, slice_len)
2711                .into_data()
2712                .into();
2713
2714            // Create unsliced run array.
2715            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2716            let typed = unsliced_run_array
2717                .downcast::<PrimitiveArray<Int32Type>>()
2718                .unwrap();
2719            let expected: Vec<Option<i32>> = input_array
2720                .iter()
2721                .skip(total_len - slice_len)
2722                .copied()
2723                .collect();
2724            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2725            assert_eq!(expected, actual);
2726        }
2727    }
2728
2729    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2730        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2731
2732        for i in 0..100_000 {
2733            for value in [i, i, i] {
2734                ls.values().append_value(value);
2735            }
2736            ls.append(true)
2737        }
2738
2739        ls.finish()
2740    }
2741
2742    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2743        let mut ls =
2744            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2745
2746        for _i in 0..10_000 {
2747            for j in 0..10 {
2748                for value in [j, j, j, j] {
2749                    ls.values().values().append_value(value);
2750                }
2751                ls.values().append(true)
2752            }
2753            ls.append(true);
2754        }
2755
2756        ls.finish()
2757    }
2758
2759    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2760        let mut ls =
2761            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2762
2763        for _i in 0..999 {
2764            ls.values().append(true);
2765            ls.append(true);
2766        }
2767
2768        for j in 0..10 {
2769            for value in [j, j, j, j] {
2770                ls.values().values().append_value(value);
2771            }
2772            ls.values().append(true)
2773        }
2774        ls.append(true);
2775
2776        for i in 0..9_000 {
2777            for j in 0..10 {
2778                for value in [i + j, i + j, i + j, i + j] {
2779                    ls.values().values().append_value(value);
2780                }
2781                ls.values().append(true)
2782            }
2783            ls.append(true);
2784        }
2785
2786        ls.finish()
2787    }
2788
2789    fn generate_map_array_data() -> MapArray {
2790        let keys_builder = UInt32Builder::new();
2791        let values_builder = UInt32Builder::new();
2792
2793        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2794
2795        for i in 0..100_000 {
2796            for _j in 0..3 {
2797                builder.keys().append_value(i);
2798                builder.values().append_value(i * 2);
2799            }
2800            builder.append(true).unwrap();
2801        }
2802
2803        builder.finish()
2804    }
2805
2806    #[test]
2807    fn reencode_offsets_when_first_offset_is_not_zero() {
2808        let original_list = generate_list_data::<i32>();
2809        let original_data = original_list.into_data();
2810        let slice_data = original_data.slice(75, 7);
2811        let (new_offsets, original_start, length) =
2812            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2813        assert_eq!(
2814            vec![0, 3, 6, 9, 12, 15, 18, 21],
2815            new_offsets.typed_data::<i32>()
2816        );
2817        assert_eq!(225, original_start);
2818        assert_eq!(21, length);
2819    }
2820
2821    #[test]
2822    fn reencode_offsets_when_first_offset_is_zero() {
2823        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2824        // ls = [[], [35, 42]
2825        ls.append(true);
2826        ls.values().append_value(35);
2827        ls.values().append_value(42);
2828        ls.append(true);
2829        let original_list = ls.finish();
2830        let original_data = original_list.into_data();
2831
2832        let slice_data = original_data.slice(1, 1);
2833        let (new_offsets, original_start, length) =
2834            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2835        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2836        assert_eq!(0, original_start);
2837        assert_eq!(2, length);
2838    }
2839
2840    /// Ensure when serde full & sliced versions they are equal to original input.
2841    /// Also ensure serialized sliced version is significantly smaller than serialized full.
2842    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2843        // test both full and sliced versions
2844        let in_sliced = in_batch.slice(999, 1);
2845
2846        let bytes_batch = serialize_file(&in_batch);
2847        let bytes_sliced = serialize_file(&in_sliced);
2848
2849        // serializing 1 row should be significantly smaller than serializing 100,000
2850        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2851
2852        // ensure both are still valid and equal to originals
2853        let out_batch = deserialize_file(bytes_batch);
2854        assert_eq!(in_batch, out_batch);
2855
2856        let out_sliced = deserialize_file(bytes_sliced);
2857        assert_eq!(in_sliced, out_sliced);
2858    }
2859
2860    #[test]
2861    fn encode_lists() {
2862        let val_inner = Field::new_list_field(DataType::UInt32, true);
2863        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2864        let schema = Arc::new(Schema::new(vec![val_list_field]));
2865
2866        let values = Arc::new(generate_list_data::<i32>());
2867
2868        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2869        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2870    }
2871
2872    #[test]
2873    fn encode_empty_list() {
2874        let val_inner = Field::new_list_field(DataType::UInt32, true);
2875        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2876        let schema = Arc::new(Schema::new(vec![val_list_field]));
2877
2878        let values = Arc::new(generate_list_data::<i32>());
2879
2880        let in_batch = RecordBatch::try_new(schema, vec![values])
2881            .unwrap()
2882            .slice(999, 0);
2883        let out_batch = deserialize_file(serialize_file(&in_batch));
2884        assert_eq!(in_batch, out_batch);
2885    }
2886
2887    #[test]
2888    fn encode_large_lists() {
2889        let val_inner = Field::new_list_field(DataType::UInt32, true);
2890        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2891        let schema = Arc::new(Schema::new(vec![val_list_field]));
2892
2893        let values = Arc::new(generate_list_data::<i64>());
2894
2895        // ensure when serde full & sliced versions they are equal to original input
2896        // also ensure serialized sliced version is significantly smaller than serialized full
2897        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2898        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2899    }
2900
2901    #[test]
2902    fn encode_nested_lists() {
2903        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
2904        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
2905        let list_field = Field::new("val", DataType::List(inner_list_field), true);
2906        let schema = Arc::new(Schema::new(vec![list_field]));
2907
2908        let values = Arc::new(generate_nested_list_data::<i32>());
2909
2910        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2911        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2912    }
2913
2914    #[test]
2915    fn encode_nested_lists_starting_at_zero() {
2916        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2917        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2918        let list_field = Field::new("val", DataType::List(inner_list_field), true);
2919        let schema = Arc::new(Schema::new(vec![list_field]));
2920
2921        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
2922
2923        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2924        roundtrip_ensure_sliced_smaller(in_batch, 1);
2925    }
2926
2927    #[test]
2928    fn encode_map_array() {
2929        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2930        let values = Arc::new(Field::new("values", DataType::UInt32, true));
2931        let map_field = Field::new_map("map", "entries", keys, values, false, true);
2932        let schema = Arc::new(Schema::new(vec![map_field]));
2933
2934        let values = Arc::new(generate_map_array_data());
2935
2936        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2937        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2938    }
2939
2940    #[test]
2941    fn test_decimal128_alignment16_is_sufficient() {
2942        const IPC_ALIGNMENT: usize = 16;
2943
2944        // Test a bunch of different dimensions to ensure alignment is never an issue.
2945        // For example, if we only test `num_cols = 1` then even with alignment 8 this
2946        // test would _happen_ to pass, even though for different dimensions like
2947        // `num_cols = 2` it would fail.
2948        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2949            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
2950
2951            let mut fields = Vec::new();
2952            let mut arrays = Vec::new();
2953            for i in 0..num_cols {
2954                let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2955                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2956                fields.push(field);
2957                arrays.push(Arc::new(array) as Arc<dyn Array>);
2958            }
2959            let schema = Schema::new(fields);
2960            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2961
2962            let mut writer = FileWriter::try_new_with_options(
2963                Vec::new(),
2964                batch.schema_ref(),
2965                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2966            )
2967            .unwrap();
2968            writer.write(&batch).unwrap();
2969            writer.finish().unwrap();
2970
2971            let out: Vec<u8> = writer.into_inner().unwrap();
2972
2973            let buffer = Buffer::from_vec(out);
2974            let trailer_start = buffer.len() - 10;
2975            let footer_len =
2976                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2977            let footer =
2978                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2979
2980            let schema = fb_to_schema(footer.schema().unwrap());
2981
2982            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
2983            // for `read_record_batch` later on to read the data in a zero-copy manner.
2984            let decoder =
2985                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2986
2987            let batches = footer.recordBatches().unwrap();
2988
2989            let block = batches.get(0);
2990            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2991            let data = buffer.slice_with_length(block.offset() as _, block_len);
2992
2993            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2994
2995            assert_eq!(batch, batch2);
2996        }
2997    }
2998
2999    #[test]
3000    fn test_decimal128_alignment8_is_unaligned() {
3001        const IPC_ALIGNMENT: usize = 8;
3002
3003        let num_cols = 2;
3004        let num_rows = 1;
3005
3006        let mut fields = Vec::new();
3007        let mut arrays = Vec::new();
3008        for i in 0..num_cols {
3009            let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3010            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3011            fields.push(field);
3012            arrays.push(Arc::new(array) as Arc<dyn Array>);
3013        }
3014        let schema = Schema::new(fields);
3015        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3016
3017        let mut writer = FileWriter::try_new_with_options(
3018            Vec::new(),
3019            batch.schema_ref(),
3020            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3021        )
3022        .unwrap();
3023        writer.write(&batch).unwrap();
3024        writer.finish().unwrap();
3025
3026        let out: Vec<u8> = writer.into_inner().unwrap();
3027
3028        let buffer = Buffer::from_vec(out);
3029        let trailer_start = buffer.len() - 10;
3030        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3031        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3032
3033        let schema = fb_to_schema(footer.schema().unwrap());
3034
3035        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
3036        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
3037        let decoder =
3038            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3039
3040        let batches = footer.recordBatches().unwrap();
3041
3042        let block = batches.get(0);
3043        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3044        let data = buffer.slice_with_length(block.offset() as _, block_len);
3045
3046        let result = decoder.read_record_batch(block, &data);
3047
3048        let error = result.unwrap_err();
3049        assert_eq!(
3050            error.to_string(),
3051            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3052             offset from expected alignment of 16 by 8"
3053        );
3054    }
3055
3056    #[test]
3057    fn test_flush() {
3058        // We write a schema which is small enough to fit into a buffer and not get flushed,
3059        // and then force the write with .flush().
3060        let num_cols = 2;
3061        let mut fields = Vec::new();
3062        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3063        for i in 0..num_cols {
3064            let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3065            fields.push(field);
3066        }
3067        let schema = Schema::new(fields);
3068        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3069        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3070        let mut stream_writer =
3071            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3072                .unwrap();
3073        let mut file_writer =
3074            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3075
3076        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3077        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3078        stream_writer.flush().unwrap();
3079        file_writer.flush().unwrap();
3080        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3081        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3082        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3083        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
3084        // and then a length of 0 (4 bytes) for a total of 8 bytes.
3085        // Everything before that should have been flushed in the .flush() call.
3086        let expected_stream_flushed_bytes = stream_out.len() - 8;
3087        // A file write is the same as the stream write except for the leading magic string
3088        // ARROW1 plus padding, which is 8 bytes.
3089        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3090
3091        assert!(
3092            stream_bytes_written_on_new < stream_bytes_written_on_flush,
3093            "this test makes no sense if flush is not actually required"
3094        );
3095        assert!(
3096            file_bytes_written_on_new < file_bytes_written_on_flush,
3097            "this test makes no sense if flush is not actually required"
3098        );
3099        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3100        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3101    }
3102
3103    #[test]
3104    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3105        let l1_type =
3106            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3107        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3108
3109        let l0_builder = Float32Builder::new();
3110        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3111            "item",
3112            DataType::Float32,
3113            false,
3114        )));
3115        let mut l2_builder =
3116            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3117
3118        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3119            l2_builder.values().values().append_value(point[0]);
3120            l2_builder.values().values().append_value(point[1]);
3121            l2_builder.values().values().append_value(point[2]);
3122
3123            l2_builder.values().append(true);
3124        }
3125        l2_builder.append(true);
3126
3127        let point = [10., 11., 12.];
3128        l2_builder.values().values().append_value(point[0]);
3129        l2_builder.values().values().append_value(point[1]);
3130        l2_builder.values().values().append_value(point[2]);
3131
3132        l2_builder.values().append(true);
3133        l2_builder.append(true);
3134
3135        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3136
3137        let schema = Arc::new(Schema::new_with_metadata(
3138            vec![Field::new("points", l2_type, false)],
3139            HashMap::default(),
3140        ));
3141
3142        // Test a variety of combinations that include 0 and non-zero offsets
3143        // and also portions or the rest of the array
3144        test_slices(&array, &schema, 0, 1)?;
3145        test_slices(&array, &schema, 0, 2)?;
3146        test_slices(&array, &schema, 1, 1)?;
3147
3148        Ok(())
3149    }
3150
3151    #[test]
3152    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3153        let l0_builder = Float32Builder::new();
3154        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3155        let mut l2_builder = ListBuilder::new(l1_builder);
3156
3157        for point in [
3158            [Some(1.0), Some(2.0), None],
3159            [Some(4.0), Some(5.0), Some(6.0)],
3160            [None, Some(8.0), Some(9.0)],
3161        ] {
3162            for p in point {
3163                match p {
3164                    Some(p) => l2_builder.values().values().append_value(p),
3165                    None => l2_builder.values().values().append_null(),
3166                }
3167            }
3168
3169            l2_builder.values().append(true);
3170        }
3171        l2_builder.append(true);
3172
3173        let point = [Some(10.), None, None];
3174        for p in point {
3175            match p {
3176                Some(p) => l2_builder.values().values().append_value(p),
3177                None => l2_builder.values().values().append_null(),
3178            }
3179        }
3180
3181        l2_builder.values().append(true);
3182        l2_builder.append(true);
3183
3184        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3185
3186        let schema = Arc::new(Schema::new_with_metadata(
3187            vec![Field::new(
3188                "points",
3189                DataType::List(Arc::new(Field::new(
3190                    "item",
3191                    DataType::FixedSizeList(
3192                        Arc::new(Field::new("item", DataType::Float32, true)),
3193                        3,
3194                    ),
3195                    true,
3196                ))),
3197                true,
3198            )],
3199            HashMap::default(),
3200        ));
3201
3202        // Test a variety of combinations that include 0 and non-zero offsets
3203        // and also portions or the rest of the array
3204        test_slices(&array, &schema, 0, 1)?;
3205        test_slices(&array, &schema, 0, 2)?;
3206        test_slices(&array, &schema, 1, 1)?;
3207
3208        Ok(())
3209    }
3210
3211    fn test_slices(
3212        parent_array: &ArrayRef,
3213        schema: &SchemaRef,
3214        offset: usize,
3215        length: usize,
3216    ) -> Result<(), ArrowError> {
3217        let subarray = parent_array.slice(offset, length);
3218        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3219
3220        let mut bytes = Vec::new();
3221        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3222        writer.write(&original_batch)?;
3223        writer.finish()?;
3224
3225        let mut cursor = std::io::Cursor::new(bytes);
3226        let mut reader = StreamReader::try_new(&mut cursor, None)?;
3227        let returned_batch = reader.next().unwrap()?;
3228
3229        assert_eq!(original_batch, returned_batch);
3230
3231        Ok(())
3232    }
3233
3234    #[test]
3235    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3236        let int_builder = Int64Builder::new();
3237        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3238            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3239
3240        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3241            fixed_list_builder.values().append_value(point[0]);
3242            fixed_list_builder.values().append_value(point[1]);
3243            fixed_list_builder.values().append_value(point[2]);
3244
3245            fixed_list_builder.append(true);
3246        }
3247
3248        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3249
3250        let schema = Arc::new(Schema::new_with_metadata(
3251            vec![Field::new(
3252                "points",
3253                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3254                false,
3255            )],
3256            HashMap::default(),
3257        ));
3258
3259        // Test a variety of combinations that include 0 and non-zero offsets
3260        // and also portions or the rest of the array
3261        test_slices(&array, &schema, 0, 4)?;
3262        test_slices(&array, &schema, 0, 2)?;
3263        test_slices(&array, &schema, 1, 3)?;
3264        test_slices(&array, &schema, 2, 1)?;
3265
3266        Ok(())
3267    }
3268
3269    #[test]
3270    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3271        let int_builder = Int64Builder::new();
3272        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3273
3274        for point in [
3275            [Some(1), Some(2), None],
3276            [Some(4), Some(5), Some(6)],
3277            [None, Some(8), Some(9)],
3278            [Some(10), None, None],
3279        ] {
3280            for p in point {
3281                match p {
3282                    Some(p) => fixed_list_builder.values().append_value(p),
3283                    None => fixed_list_builder.values().append_null(),
3284                }
3285            }
3286
3287            fixed_list_builder.append(true);
3288        }
3289
3290        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3291
3292        let schema = Arc::new(Schema::new_with_metadata(
3293            vec![Field::new(
3294                "points",
3295                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3296                true,
3297            )],
3298            HashMap::default(),
3299        ));
3300
3301        // Test a variety of combinations that include 0 and non-zero offsets
3302        // and also portions or the rest of the array
3303        test_slices(&array, &schema, 0, 4)?;
3304        test_slices(&array, &schema, 0, 2)?;
3305        test_slices(&array, &schema, 1, 3)?;
3306        test_slices(&array, &schema, 2, 1)?;
3307
3308        Ok(())
3309    }
3310
3311    #[test]
3312    fn test_metadata_encoding_ordering() {
3313        fn create_hash() -> u64 {
3314            let metadata: HashMap<String, String> = [
3315                ("a", "1"), //
3316                ("b", "2"), //
3317                ("c", "3"), //
3318                ("d", "4"), //
3319                ("e", "5"), //
3320            ]
3321            .into_iter()
3322            .map(|(k, v)| (k.to_owned(), v.to_owned()))
3323            .collect();
3324
3325            // Set metadata on both the schema and a field within it.
3326            let schema = Arc::new(
3327                Schema::new(vec![
3328                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone())
3329                ])
3330                .with_metadata(metadata)
3331                .clone(),
3332            );
3333            let batch = RecordBatch::new_empty(schema.clone());
3334
3335            let mut bytes = Vec::new();
3336            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3337            w.write(&batch).unwrap();
3338            w.finish().unwrap();
3339
3340            let mut h = std::hash::DefaultHasher::new();
3341            h.write(&bytes);
3342            h.finish()
3343        }
3344
3345        let expected = create_hash();
3346
3347        // Since there is randomness in the HashMap and we cannot specify our
3348        // own Hasher for the implementation used for metadata, run the above
3349        // code 20x and verify it does not change. This is not perfect but it
3350        // should be good enough.
3351        let all_passed = (0..20).all(|_| create_hash() == expected);
3352        assert!(all_passed);
3353    }
3354}