Skip to main content

arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
50#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52    /// Write padding after memory buffers to this multiple of bytes.
53    /// Must be 8, 16, 32, or 64 - defaults to 64.
54    alignment: u8,
55    /// The legacy format is for releases before 0.15.0, and uses metadata V4
56    write_legacy_ipc_format: bool,
57    /// The metadata version to write. The Rust IPC writer supports V4+
58    ///
59    /// *Default versions per crate*
60    ///
61    /// When creating the default IpcWriteOptions, the following metadata versions are used:
62    ///
63    /// version 2.0.0: V4, with legacy format enabled
64    /// version 4.0.0: V5
65    metadata_version: crate::MetadataVersion,
66    /// Compression, if desired. Will result in a runtime error
67    /// if the corresponding feature is not enabled
68    batch_compression_type: Option<crate::CompressionType>,
69    /// How to handle updating dictionaries in IPC messages
70    dictionary_handling: DictionaryHandling,
71}
72
73impl IpcWriteOptions {
74    /// Configures compression when writing IPC files.
75    ///
76    /// Will result in a runtime error if the corresponding feature
77    /// is not enabled
78    pub fn try_with_compression(
79        mut self,
80        batch_compression_type: Option<crate::CompressionType>,
81    ) -> Result<Self, ArrowError> {
82        self.batch_compression_type = batch_compression_type;
83
84        if self.batch_compression_type.is_some()
85            && self.metadata_version < crate::MetadataVersion::V5
86        {
87            return Err(ArrowError::InvalidArgumentError(
88                "Compression only supported in metadata v5 and above".to_string(),
89            ));
90        }
91        Ok(self)
92    }
93    /// Try to create IpcWriteOptions, checking for incompatible settings
94    pub fn try_new(
95        alignment: usize,
96        write_legacy_ipc_format: bool,
97        metadata_version: crate::MetadataVersion,
98    ) -> Result<Self, ArrowError> {
99        let is_alignment_valid =
100            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
101        if !is_alignment_valid {
102            return Err(ArrowError::InvalidArgumentError(
103                "Alignment should be 8, 16, 32, or 64.".to_string(),
104            ));
105        }
106        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
107        match metadata_version {
108            crate::MetadataVersion::V1
109            | crate::MetadataVersion::V2
110            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
111                "Writing IPC metadata version 3 and lower not supported".to_string(),
112            )),
113            #[allow(deprecated)]
114            crate::MetadataVersion::V4 => Ok(Self {
115                alignment,
116                write_legacy_ipc_format,
117                metadata_version,
118                batch_compression_type: None,
119                dictionary_handling: DictionaryHandling::default(),
120            }),
121            crate::MetadataVersion::V5 => {
122                if write_legacy_ipc_format {
123                    Err(ArrowError::InvalidArgumentError(
124                        "Legacy IPC format only supported on metadata version 4".to_string(),
125                    ))
126                } else {
127                    Ok(Self {
128                        alignment,
129                        write_legacy_ipc_format,
130                        metadata_version,
131                        batch_compression_type: None,
132                        dictionary_handling: DictionaryHandling::default(),
133                    })
134                }
135            }
136            z => Err(ArrowError::InvalidArgumentError(format!(
137                "Unsupported crate::MetadataVersion {z:?}"
138            ))),
139        }
140    }
141
142    /// Configure how dictionaries are handled in IPC messages
143    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
144        self.dictionary_handling = dictionary_handling;
145        self
146    }
147}
148
149impl Default for IpcWriteOptions {
150    fn default() -> Self {
151        Self {
152            alignment: 64,
153            write_legacy_ipc_format: false,
154            metadata_version: crate::MetadataVersion::V5,
155            batch_compression_type: None,
156            dictionary_handling: DictionaryHandling::default(),
157        }
158    }
159}
160
161#[derive(Debug, Default)]
162/// Handles low level details of encoding [`Array`] and [`Schema`] into the
163/// [Arrow IPC Format].
164///
165/// # Example
166/// ```
167/// # fn run() {
168/// # use std::sync::Arc;
169/// # use arrow_array::UInt64Array;
170/// # use arrow_array::RecordBatch;
171/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
172///
173/// // Create a record batch
174/// let batch = RecordBatch::try_from_iter(vec![
175///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
176/// ]).unwrap();
177///
178/// // Error of dictionary ids are replaced.
179/// let error_on_replacement = true;
180/// let options = IpcWriteOptions::default();
181/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
182///
183/// let mut compression_context = CompressionContext::default();
184///
185/// // encode the batch into zero or more encoded dictionaries
186/// // and the data for the actual array.
187/// let data_gen = IpcDataGenerator::default();
188/// let (encoded_dictionaries, encoded_message) = data_gen
189///   .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context)
190///   .unwrap();
191/// # }
192/// ```
193///
194/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
195pub struct IpcDataGenerator {}
196
197impl IpcDataGenerator {
198    /// Converts a schema to an IPC message along with `dictionary_tracker`
199    /// and returns it encoded inside [EncodedData] as a flatbuffer.
200    pub fn schema_to_bytes_with_dictionary_tracker(
201        &self,
202        schema: &Schema,
203        dictionary_tracker: &mut DictionaryTracker,
204        write_options: &IpcWriteOptions,
205    ) -> EncodedData {
206        let mut fbb = FlatBufferBuilder::new();
207        let schema = {
208            let fb = IpcSchemaEncoder::new()
209                .with_dictionary_tracker(dictionary_tracker)
210                .schema_to_fb_offset(&mut fbb, schema);
211            fb.as_union_value()
212        };
213
214        let mut message = crate::MessageBuilder::new(&mut fbb);
215        message.add_version(write_options.metadata_version);
216        message.add_header_type(crate::MessageHeader::Schema);
217        message.add_bodyLength(0);
218        message.add_header(schema);
219        // TODO: custom metadata
220        let data = message.finish();
221        fbb.finish(data, None);
222
223        let data = fbb.finished_data();
224        EncodedData {
225            ipc_message: data.to_vec(),
226            arrow_data: vec![],
227        }
228    }
229
230    fn _encode_dictionaries<I: Iterator<Item = i64>>(
231        &self,
232        column: &ArrayRef,
233        encoded_dictionaries: &mut Vec<EncodedData>,
234        dictionary_tracker: &mut DictionaryTracker,
235        write_options: &IpcWriteOptions,
236        dict_id: &mut I,
237        compression_context: &mut CompressionContext,
238    ) -> Result<(), ArrowError> {
239        match column.data_type() {
240            DataType::Struct(fields) => {
241                let s = as_struct_array(column);
242                for (field, column) in fields.iter().zip(s.columns()) {
243                    self.encode_dictionaries(
244                        field,
245                        column,
246                        encoded_dictionaries,
247                        dictionary_tracker,
248                        write_options,
249                        dict_id,
250                        compression_context,
251                    )?;
252                }
253            }
254            DataType::RunEndEncoded(_, values) => {
255                let data = column.to_data();
256                if data.child_data().len() != 2 {
257                    return Err(ArrowError::InvalidArgumentError(format!(
258                        "The run encoded array should have exactly two child arrays. Found {}",
259                        data.child_data().len()
260                    )));
261                }
262                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
263                // only for values array.
264                let values_array = make_array(data.child_data()[1].clone());
265                self.encode_dictionaries(
266                    values,
267                    &values_array,
268                    encoded_dictionaries,
269                    dictionary_tracker,
270                    write_options,
271                    dict_id,
272                    compression_context,
273                )?;
274            }
275            DataType::List(field) => {
276                let list = as_list_array(column);
277                self.encode_dictionaries(
278                    field,
279                    list.values(),
280                    encoded_dictionaries,
281                    dictionary_tracker,
282                    write_options,
283                    dict_id,
284                    compression_context,
285                )?;
286            }
287            DataType::LargeList(field) => {
288                let list = as_large_list_array(column);
289                self.encode_dictionaries(
290                    field,
291                    list.values(),
292                    encoded_dictionaries,
293                    dictionary_tracker,
294                    write_options,
295                    dict_id,
296                    compression_context,
297                )?;
298            }
299            DataType::ListView(field) => {
300                let list = column.as_list_view::<i32>();
301                self.encode_dictionaries(
302                    field,
303                    list.values(),
304                    encoded_dictionaries,
305                    dictionary_tracker,
306                    write_options,
307                    dict_id,
308                    compression_context,
309                )?;
310            }
311            DataType::LargeListView(field) => {
312                let list = column.as_list_view::<i64>();
313                self.encode_dictionaries(
314                    field,
315                    list.values(),
316                    encoded_dictionaries,
317                    dictionary_tracker,
318                    write_options,
319                    dict_id,
320                    compression_context,
321                )?;
322            }
323            DataType::FixedSizeList(field, _) => {
324                let list = column
325                    .as_any()
326                    .downcast_ref::<FixedSizeListArray>()
327                    .expect("Unable to downcast to fixed size list array");
328                self.encode_dictionaries(
329                    field,
330                    list.values(),
331                    encoded_dictionaries,
332                    dictionary_tracker,
333                    write_options,
334                    dict_id,
335                    compression_context,
336                )?;
337            }
338            DataType::Map(field, _) => {
339                let map_array = as_map_array(column);
340
341                let (keys, values) = match field.data_type() {
342                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
343                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
344                };
345
346                // keys
347                self.encode_dictionaries(
348                    keys,
349                    map_array.keys(),
350                    encoded_dictionaries,
351                    dictionary_tracker,
352                    write_options,
353                    dict_id,
354                    compression_context,
355                )?;
356
357                // values
358                self.encode_dictionaries(
359                    values,
360                    map_array.values(),
361                    encoded_dictionaries,
362                    dictionary_tracker,
363                    write_options,
364                    dict_id,
365                    compression_context,
366                )?;
367            }
368            DataType::Union(fields, _) => {
369                let union = as_union_array(column);
370                for (type_id, field) in fields.iter() {
371                    let column = union.child(type_id);
372                    self.encode_dictionaries(
373                        field,
374                        column,
375                        encoded_dictionaries,
376                        dictionary_tracker,
377                        write_options,
378                        dict_id,
379                        compression_context,
380                    )?;
381                }
382            }
383            _ => (),
384        }
385
386        Ok(())
387    }
388
389    #[allow(clippy::too_many_arguments)]
390    fn encode_dictionaries<I: Iterator<Item = i64>>(
391        &self,
392        field: &Field,
393        column: &ArrayRef,
394        encoded_dictionaries: &mut Vec<EncodedData>,
395        dictionary_tracker: &mut DictionaryTracker,
396        write_options: &IpcWriteOptions,
397        dict_id_seq: &mut I,
398        compression_context: &mut CompressionContext,
399    ) -> Result<(), ArrowError> {
400        match column.data_type() {
401            DataType::Dictionary(_key_type, _value_type) => {
402                let dict_data = column.to_data();
403                let dict_values = &dict_data.child_data()[0];
404
405                let values = make_array(dict_data.child_data()[0].clone());
406
407                self._encode_dictionaries(
408                    &values,
409                    encoded_dictionaries,
410                    dictionary_tracker,
411                    write_options,
412                    dict_id_seq,
413                    compression_context,
414                )?;
415
416                // It's important to only take the dict_id at this point, because the dict ID
417                // sequence is assigned depth-first, so we need to first encode children and have
418                // them take their assigned dict IDs before we take the dict ID for this field.
419                let dict_id = dict_id_seq.next().ok_or_else(|| {
420                    ArrowError::IpcError(format!("no dict id for field {}", field.name()))
421                })?;
422
423                match dictionary_tracker.insert_column(
424                    dict_id,
425                    column,
426                    write_options.dictionary_handling,
427                )? {
428                    DictionaryUpdate::None => {}
429                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
430                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
431                            dict_id,
432                            dict_values,
433                            write_options,
434                            false,
435                            compression_context,
436                        )?);
437                    }
438                    DictionaryUpdate::Delta(data) => {
439                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
440                            dict_id,
441                            &data,
442                            write_options,
443                            true,
444                            compression_context,
445                        )?);
446                    }
447                }
448            }
449            _ => self._encode_dictionaries(
450                column,
451                encoded_dictionaries,
452                dictionary_tracker,
453                write_options,
454                dict_id_seq,
455                compression_context,
456            )?,
457        }
458
459        Ok(())
460    }
461
462    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
463    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
464    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
465    pub fn encode(
466        &self,
467        batch: &RecordBatch,
468        dictionary_tracker: &mut DictionaryTracker,
469        write_options: &IpcWriteOptions,
470        compression_context: &mut CompressionContext,
471    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
472        let schema = batch.schema();
473        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
474
475        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
476
477        for (i, field) in schema.fields().iter().enumerate() {
478            let column = batch.column(i);
479            self.encode_dictionaries(
480                field,
481                column,
482                &mut encoded_dictionaries,
483                dictionary_tracker,
484                write_options,
485                &mut dict_id,
486                compression_context,
487            )?;
488        }
489
490        let encoded_message =
491            self.record_batch_to_bytes(batch, write_options, compression_context)?;
492        Ok((encoded_dictionaries, encoded_message))
493    }
494
495    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
496    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
497    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
498    #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
499    pub fn encoded_batch(
500        &self,
501        batch: &RecordBatch,
502        dictionary_tracker: &mut DictionaryTracker,
503        write_options: &IpcWriteOptions,
504    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
505        self.encode(
506            batch,
507            dictionary_tracker,
508            write_options,
509            &mut Default::default(),
510        )
511    }
512
513    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
514    /// other for the batch's data
515    fn record_batch_to_bytes(
516        &self,
517        batch: &RecordBatch,
518        write_options: &IpcWriteOptions,
519        compression_context: &mut CompressionContext,
520    ) -> Result<EncodedData, ArrowError> {
521        let mut fbb = FlatBufferBuilder::new();
522
523        let mut nodes: Vec<crate::FieldNode> = vec![];
524        let mut buffers: Vec<crate::Buffer> = vec![];
525        let mut arrow_data: Vec<u8> = vec![];
526        let mut offset = 0;
527
528        // get the type of compression
529        let batch_compression_type = write_options.batch_compression_type;
530
531        let compression = batch_compression_type.map(|batch_compression_type| {
532            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
533            c.add_method(crate::BodyCompressionMethod::BUFFER);
534            c.add_codec(batch_compression_type);
535            c.finish()
536        });
537
538        let compression_codec: Option<CompressionCodec> =
539            batch_compression_type.map(TryInto::try_into).transpose()?;
540
541        let mut variadic_buffer_counts = vec![];
542
543        for array in batch.columns() {
544            let array_data = array.to_data();
545            offset = write_array_data(
546                &array_data,
547                &mut buffers,
548                &mut arrow_data,
549                &mut nodes,
550                offset,
551                array.len(),
552                array.null_count(),
553                compression_codec,
554                compression_context,
555                write_options,
556            )?;
557
558            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
559        }
560        // pad the tail of body data
561        let len = arrow_data.len();
562        let pad_len = pad_to_alignment(write_options.alignment, len);
563        arrow_data.extend_from_slice(&PADDING[..pad_len]);
564
565        // write data
566        let buffers = fbb.create_vector(&buffers);
567        let nodes = fbb.create_vector(&nodes);
568        let variadic_buffer = if variadic_buffer_counts.is_empty() {
569            None
570        } else {
571            Some(fbb.create_vector(&variadic_buffer_counts))
572        };
573
574        let root = {
575            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
576            batch_builder.add_length(batch.num_rows() as i64);
577            batch_builder.add_nodes(nodes);
578            batch_builder.add_buffers(buffers);
579            if let Some(c) = compression {
580                batch_builder.add_compression(c);
581            }
582
583            if let Some(v) = variadic_buffer {
584                batch_builder.add_variadicBufferCounts(v);
585            }
586            let b = batch_builder.finish();
587            b.as_union_value()
588        };
589        // create an crate::Message
590        let mut message = crate::MessageBuilder::new(&mut fbb);
591        message.add_version(write_options.metadata_version);
592        message.add_header_type(crate::MessageHeader::RecordBatch);
593        message.add_bodyLength(arrow_data.len() as i64);
594        message.add_header(root);
595        let root = message.finish();
596        fbb.finish(root, None);
597        let finished_data = fbb.finished_data();
598
599        Ok(EncodedData {
600            ipc_message: finished_data.to_vec(),
601            arrow_data,
602        })
603    }
604
605    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
606    /// other for the data
607    fn dictionary_batch_to_bytes(
608        &self,
609        dict_id: i64,
610        array_data: &ArrayData,
611        write_options: &IpcWriteOptions,
612        is_delta: bool,
613        compression_context: &mut CompressionContext,
614    ) -> Result<EncodedData, ArrowError> {
615        let mut fbb = FlatBufferBuilder::new();
616
617        let mut nodes: Vec<crate::FieldNode> = vec![];
618        let mut buffers: Vec<crate::Buffer> = vec![];
619        let mut arrow_data: Vec<u8> = vec![];
620
621        // get the type of compression
622        let batch_compression_type = write_options.batch_compression_type;
623
624        let compression = batch_compression_type.map(|batch_compression_type| {
625            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
626            c.add_method(crate::BodyCompressionMethod::BUFFER);
627            c.add_codec(batch_compression_type);
628            c.finish()
629        });
630
631        let compression_codec: Option<CompressionCodec> = batch_compression_type
632            .map(|batch_compression_type| batch_compression_type.try_into())
633            .transpose()?;
634
635        write_array_data(
636            array_data,
637            &mut buffers,
638            &mut arrow_data,
639            &mut nodes,
640            0,
641            array_data.len(),
642            array_data.null_count(),
643            compression_codec,
644            compression_context,
645            write_options,
646        )?;
647
648        let mut variadic_buffer_counts = vec![];
649        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
650
651        // pad the tail of body data
652        let len = arrow_data.len();
653        let pad_len = pad_to_alignment(write_options.alignment, len);
654        arrow_data.extend_from_slice(&PADDING[..pad_len]);
655
656        // write data
657        let buffers = fbb.create_vector(&buffers);
658        let nodes = fbb.create_vector(&nodes);
659        let variadic_buffer = if variadic_buffer_counts.is_empty() {
660            None
661        } else {
662            Some(fbb.create_vector(&variadic_buffer_counts))
663        };
664
665        let root = {
666            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
667            batch_builder.add_length(array_data.len() as i64);
668            batch_builder.add_nodes(nodes);
669            batch_builder.add_buffers(buffers);
670            if let Some(c) = compression {
671                batch_builder.add_compression(c);
672            }
673            if let Some(v) = variadic_buffer {
674                batch_builder.add_variadicBufferCounts(v);
675            }
676            batch_builder.finish()
677        };
678
679        let root = {
680            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
681            batch_builder.add_id(dict_id);
682            batch_builder.add_data(root);
683            batch_builder.add_isDelta(is_delta);
684            batch_builder.finish().as_union_value()
685        };
686
687        let root = {
688            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
689            message_builder.add_version(write_options.metadata_version);
690            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
691            message_builder.add_bodyLength(arrow_data.len() as i64);
692            message_builder.add_header(root);
693            message_builder.finish()
694        };
695
696        fbb.finish(root, None);
697        let finished_data = fbb.finished_data();
698
699        Ok(EncodedData {
700            ipc_message: finished_data.to_vec(),
701            arrow_data,
702        })
703    }
704}
705
706fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
707    match array.data_type() {
708        DataType::BinaryView | DataType::Utf8View => {
709            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
710            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
711            counts.push(array.buffers().len() as i64 - 1);
712        }
713        DataType::Dictionary(_, _) => {
714            // Do nothing
715            // Dictionary types are handled in `encode_dictionaries`.
716        }
717        _ => {
718            for child in array.child_data() {
719                append_variadic_buffer_counts(counts, child)
720            }
721        }
722    }
723}
724
725pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
726    match arr.data_type() {
727        DataType::RunEndEncoded(k, _) => match k.data_type() {
728            DataType::Int16 => {
729                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
730            }
731            DataType::Int32 => {
732                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
733            }
734            DataType::Int64 => {
735                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
736            }
737            d => unreachable!("Unexpected data type {d}"),
738        },
739        d => Err(ArrowError::InvalidArgumentError(format!(
740            "The given array is not a run array. Data type of given array: {d}"
741        ))),
742    }
743}
744
745// Returns a `RunArray` with zero offset and length matching the last value
746// in run_ends array.
747fn into_zero_offset_run_array<R: RunEndIndexType>(
748    run_array: RunArray<R>,
749) -> Result<RunArray<R>, ArrowError> {
750    let run_ends = run_array.run_ends();
751    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
752        return Ok(run_array);
753    }
754
755    // The physical index of original run_ends array from which the `ArrayData`is sliced.
756    let start_physical_index = run_ends.get_start_physical_index();
757
758    // The physical index of original run_ends array until which the `ArrayData`is sliced.
759    let end_physical_index = run_ends.get_end_physical_index();
760
761    let physical_length = end_physical_index - start_physical_index + 1;
762
763    // build new run_ends array by subtracting offset from run ends.
764    let offset = R::Native::usize_as(run_ends.offset());
765    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
766    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
767        builder.append(run_end_value.sub_wrapping(offset));
768    }
769    builder.append(R::Native::from_usize(run_array.len()).unwrap());
770    let new_run_ends = unsafe {
771        // Safety:
772        // The function builds a valid run_ends array and hence need not be validated.
773        ArrayDataBuilder::new(R::DATA_TYPE)
774            .len(physical_length)
775            .add_buffer(builder.finish())
776            .build_unchecked()
777    };
778
779    // build new values by slicing physical indices.
780    let new_values = run_array
781        .values()
782        .slice(start_physical_index, physical_length)
783        .into_data();
784
785    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
786        .len(run_array.len())
787        .add_child_data(new_run_ends)
788        .add_child_data(new_values);
789    let array_data = unsafe {
790        // Safety:
791        //  This function builds a valid run array and hence can skip validation.
792        builder.build_unchecked()
793    };
794    Ok(array_data.into())
795}
796
797/// Controls how dictionaries are handled in Arrow IPC messages
798#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
799pub enum DictionaryHandling {
800    /// Send the entire dictionary every time it is encountered (default)
801    #[default]
802    Resend,
803    /// Send only new dictionary values since the last batch (delta encoding)
804    ///
805    /// When a dictionary is first encountered, the entire dictionary is sent.
806    /// For subsequent batches, only values that are new (not previously sent)
807    /// are transmitted with the `isDelta` flag set to true.
808    Delta,
809}
810
811/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
812#[derive(Debug, Clone)]
813pub enum DictionaryUpdate {
814    /// No dictionary was written, the dictionary was identical to what was already
815    /// in the tracker.
816    None,
817    /// No dictionary was present in the tracker
818    New,
819    /// Dictionary was replaced with the new data
820    Replaced,
821    /// Dictionary was updated, ArrayData is the delta between old and new
822    Delta(ArrayData),
823}
824
825/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
826/// multiple times.
827///
828/// Can optionally error if an update to an existing dictionary is attempted, which
829/// isn't allowed in the `FileWriter`.
830#[derive(Debug)]
831pub struct DictionaryTracker {
832    // NOTE: When adding fields, update the clear() method accordingly.
833    written: HashMap<i64, ArrayData>,
834    dict_ids: Vec<i64>,
835    error_on_replacement: bool,
836}
837
838impl DictionaryTracker {
839    /// Create a new [`DictionaryTracker`].
840    ///
841    /// If `error_on_replacement`
842    /// is true, an error will be generated if an update to an
843    /// existing dictionary is attempted.
844    pub fn new(error_on_replacement: bool) -> Self {
845        #[allow(deprecated)]
846        Self {
847            written: HashMap::new(),
848            dict_ids: Vec::new(),
849            error_on_replacement,
850        }
851    }
852
853    /// Record and return the next dictionary ID.
854    pub fn next_dict_id(&mut self) -> i64 {
855        let next = self
856            .dict_ids
857            .last()
858            .copied()
859            .map(|i| i + 1)
860            .unwrap_or_default();
861
862        self.dict_ids.push(next);
863        next
864    }
865
866    /// Return the sequence of dictionary IDs in the order they should be observed while
867    /// traversing the schema
868    pub fn dict_id(&mut self) -> &[i64] {
869        &self.dict_ids
870    }
871
872    /// Keep track of the dictionary with the given ID and values. Behavior:
873    ///
874    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
875    ///   that the dictionary was not actually inserted (because it's already been seen).
876    /// * If this ID has been written already but with different data, and this tracker is
877    ///   configured to return an error, return an error.
878    /// * If the tracker has not been configured to error on replacement or this dictionary
879    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
880    ///   inserted.
881    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
882    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
883        let dict_data = column.to_data();
884        let dict_values = &dict_data.child_data()[0];
885
886        // If a dictionary with this id was already emitted, check if it was the same.
887        if let Some(last) = self.written.get(&dict_id) {
888            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
889                // Same dictionary values => no need to emit it again
890                return Ok(false);
891            }
892            if self.error_on_replacement {
893                // If error on replacement perform a logical comparison
894                if last.child_data()[0] == *dict_values {
895                    // Same dictionary values => no need to emit it again
896                    return Ok(false);
897                }
898                return Err(ArrowError::InvalidArgumentError(
899                    "Dictionary replacement detected when writing IPC file format. \
900                     Arrow IPC files only support a single dictionary for a given field \
901                     across all batches."
902                        .to_string(),
903                ));
904            }
905        }
906
907        self.written.insert(dict_id, dict_data);
908        Ok(true)
909    }
910
911    /// Keep track of the dictionary with the given ID and values. The return
912    /// value indicates what, if any, update to the internal map took place
913    /// and how it should be interpreted based on the `dict_handling` parameter.
914    ///
915    /// # Returns
916    ///
917    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
918    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
919    ///   with completely different data, or if the data is a delta of the existing,
920    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
921    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
922    ///   the new data is a delta of the old and the `dict_handling` is set to
923    ///   `DictionaryHandling::Delta`
924    /// * `Err(e)` - If the dictionary was previously written with different data,
925    ///   and `error_on_replacement` is set to `true`.
926    pub fn insert_column(
927        &mut self,
928        dict_id: i64,
929        column: &ArrayRef,
930        dict_handling: DictionaryHandling,
931    ) -> Result<DictionaryUpdate, ArrowError> {
932        let new_data = column.to_data();
933        let new_values = &new_data.child_data()[0];
934
935        // If there is no existing dictionary with this ID, we always insert
936        let Some(old) = self.written.get(&dict_id) else {
937            self.written.insert(dict_id, new_data);
938            return Ok(DictionaryUpdate::New);
939        };
940
941        // Fast path - If the array data points to the same buffer as the
942        // existing then they're the same.
943        let old_values = &old.child_data()[0];
944        if ArrayData::ptr_eq(old_values, new_values) {
945            return Ok(DictionaryUpdate::None);
946        }
947
948        // Slow path - Compare the dictionaries value by value
949        let comparison = compare_dictionaries(old_values, new_values);
950        if matches!(comparison, DictionaryComparison::Equal) {
951            return Ok(DictionaryUpdate::None);
952        }
953
954        const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
955                 Arrow IPC files only support a single dictionary for a given field \
956                 across all batches.";
957
958        match comparison {
959            DictionaryComparison::NotEqual => {
960                if self.error_on_replacement {
961                    return Err(ArrowError::InvalidArgumentError(
962                        REPLACEMENT_ERROR.to_string(),
963                    ));
964                }
965
966                self.written.insert(dict_id, new_data);
967                Ok(DictionaryUpdate::Replaced)
968            }
969            DictionaryComparison::Delta => match dict_handling {
970                DictionaryHandling::Resend => {
971                    if self.error_on_replacement {
972                        return Err(ArrowError::InvalidArgumentError(
973                            REPLACEMENT_ERROR.to_string(),
974                        ));
975                    }
976
977                    self.written.insert(dict_id, new_data);
978                    Ok(DictionaryUpdate::Replaced)
979                }
980                DictionaryHandling::Delta => {
981                    let delta =
982                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
983                    self.written.insert(dict_id, new_data);
984                    Ok(DictionaryUpdate::Delta(delta))
985                }
986            },
987            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
988        }
989    }
990
991    /// Clears the state of the dictionary tracker.
992    ///
993    /// This allows the dictionary tracker to be reused for a new IPC stream while avoiding the
994    /// allocation cost of creating a new instance. This method should not be called if
995    /// the dictionary tracker will be used to continue writing to an existing IPC stream.
996    pub fn clear(&mut self) {
997        self.dict_ids.clear();
998        self.written.clear();
999    }
1000}
1001
1002/// Describes how two dictionary arrays compare to each other.
1003#[derive(Debug, Clone)]
1004enum DictionaryComparison {
1005    /// Neither a delta, nor an exact match
1006    NotEqual,
1007    /// Exact element-wise match
1008    Equal,
1009    /// The two arrays are dictionary deltas of each other, meaning the first
1010    /// is a prefix of the second.
1011    Delta,
1012}
1013
1014// Compares two dictionaries and returns a [`DictionaryComparison`].
1015fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1016    // Check for exact match
1017    let existing_len = old.len();
1018    let new_len = new.len();
1019    if existing_len == new_len {
1020        if *old == *new {
1021            return DictionaryComparison::Equal;
1022        } else {
1023            return DictionaryComparison::NotEqual;
1024        }
1025    }
1026
1027    // Can't be a delta if the new is shorter than the existing
1028    if new_len < existing_len {
1029        return DictionaryComparison::NotEqual;
1030    }
1031
1032    // Check for delta
1033    if new.slice(0, existing_len) == *old {
1034        return DictionaryComparison::Delta;
1035    }
1036
1037    DictionaryComparison::NotEqual
1038}
1039
1040/// Arrow File Writer
1041///
1042/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
1043///
1044/// # See Also
1045///
1046/// * [`StreamWriter`] for writing IPC Streams
1047///
1048/// # Example
1049/// ```
1050/// # use arrow_array::record_batch;
1051/// # use arrow_ipc::writer::FileWriter;
1052/// # let mut file = vec![]; // mimic a file for the example
1053/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1054/// // create a new writer, the schema must be known in advance
1055/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
1056/// // write each batch to the underlying writer
1057/// writer.write(&batch).unwrap();
1058/// // When all batches are written, call finish to flush all buffers
1059/// writer.finish().unwrap();
1060/// ```
1061/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
1062pub struct FileWriter<W> {
1063    /// The object to write to
1064    writer: W,
1065    /// IPC write options
1066    write_options: IpcWriteOptions,
1067    /// A reference to the schema, used in validating record batches
1068    schema: SchemaRef,
1069    /// The number of bytes between each block of bytes, as an offset for random access
1070    block_offsets: usize,
1071    /// Dictionary blocks that will be written as part of the IPC footer
1072    dictionary_blocks: Vec<crate::Block>,
1073    /// Record blocks that will be written as part of the IPC footer
1074    record_blocks: Vec<crate::Block>,
1075    /// Whether the writer footer has been written, and the writer is finished
1076    finished: bool,
1077    /// Keeps track of dictionaries that have been written
1078    dictionary_tracker: DictionaryTracker,
1079    /// User level customized metadata
1080    custom_metadata: HashMap<String, String>,
1081
1082    data_gen: IpcDataGenerator,
1083
1084    compression_context: CompressionContext,
1085}
1086
1087impl<W: Write> FileWriter<BufWriter<W>> {
1088    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1089    ///
1090    /// See [`FileWriter::try_new`] for an unbuffered version.
1091    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1092        Self::try_new(BufWriter::new(writer), schema)
1093    }
1094}
1095
1096impl<W: Write> FileWriter<W> {
1097    /// Try to create a new writer, with the schema written as part of the header
1098    ///
1099    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1100    ///
1101    /// # Errors
1102    ///
1103    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1104    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1105        let write_options = IpcWriteOptions::default();
1106        Self::try_new_with_options(writer, schema, write_options)
1107    }
1108
1109    /// Try to create a new writer with IpcWriteOptions
1110    ///
1111    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1112    ///
1113    /// # Errors
1114    ///
1115    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1116    pub fn try_new_with_options(
1117        mut writer: W,
1118        schema: &Schema,
1119        write_options: IpcWriteOptions,
1120    ) -> Result<Self, ArrowError> {
1121        let data_gen = IpcDataGenerator::default();
1122        // write magic to header aligned on alignment boundary
1123        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1124        let header_size = super::ARROW_MAGIC.len() + pad_len;
1125        writer.write_all(&super::ARROW_MAGIC)?;
1126        writer.write_all(&PADDING[..pad_len])?;
1127        // write the schema, set the written bytes to the schema + header
1128        let mut dictionary_tracker = DictionaryTracker::new(true);
1129        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1130            schema,
1131            &mut dictionary_tracker,
1132            &write_options,
1133        );
1134        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1135        Ok(Self {
1136            writer,
1137            write_options,
1138            schema: Arc::new(schema.clone()),
1139            block_offsets: meta + data + header_size,
1140            dictionary_blocks: vec![],
1141            record_blocks: vec![],
1142            finished: false,
1143            dictionary_tracker,
1144            custom_metadata: HashMap::new(),
1145            data_gen,
1146            compression_context: CompressionContext::default(),
1147        })
1148    }
1149
1150    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1151    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1152        self.custom_metadata.insert(key.into(), value.into());
1153    }
1154
1155    /// Write a record batch to the file
1156    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1157        if self.finished {
1158            return Err(ArrowError::IpcError(
1159                "Cannot write record batch to file writer as it is closed".to_string(),
1160            ));
1161        }
1162
1163        let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
1164            batch,
1165            &mut self.dictionary_tracker,
1166            &self.write_options,
1167            &mut self.compression_context,
1168        )?;
1169
1170        for encoded_dictionary in encoded_dictionaries {
1171            let (meta, data) =
1172                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1173
1174            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1175            self.dictionary_blocks.push(block);
1176            self.block_offsets += meta + data;
1177        }
1178
1179        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1180
1181        // add a record block for the footer
1182        let block = crate::Block::new(
1183            self.block_offsets as i64,
1184            meta as i32, // TODO: is this still applicable?
1185            data as i64,
1186        );
1187        self.record_blocks.push(block);
1188        self.block_offsets += meta + data;
1189        Ok(())
1190    }
1191
1192    /// Write footer and closing tag, then mark the writer as done
1193    pub fn finish(&mut self) -> Result<(), ArrowError> {
1194        if self.finished {
1195            return Err(ArrowError::IpcError(
1196                "Cannot write footer to file writer as it is closed".to_string(),
1197            ));
1198        }
1199
1200        // write EOS
1201        write_continuation(&mut self.writer, &self.write_options, 0)?;
1202
1203        let mut fbb = FlatBufferBuilder::new();
1204        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1205        let record_batches = fbb.create_vector(&self.record_blocks);
1206
1207        // dictionaries are already written, so we can reset dictionary tracker to reuse for schema
1208        self.dictionary_tracker.clear();
1209        let schema = IpcSchemaEncoder::new()
1210            .with_dictionary_tracker(&mut self.dictionary_tracker)
1211            .schema_to_fb_offset(&mut fbb, &self.schema);
1212        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1213            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1214
1215        let root = {
1216            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1217            footer_builder.add_version(self.write_options.metadata_version);
1218            footer_builder.add_schema(schema);
1219            footer_builder.add_dictionaries(dictionaries);
1220            footer_builder.add_recordBatches(record_batches);
1221            if let Some(fb_custom_metadata) = fb_custom_metadata {
1222                footer_builder.add_custom_metadata(fb_custom_metadata);
1223            }
1224            footer_builder.finish()
1225        };
1226        fbb.finish(root, None);
1227        let footer_data = fbb.finished_data();
1228        self.writer.write_all(footer_data)?;
1229        self.writer
1230            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1231        self.writer.write_all(&super::ARROW_MAGIC)?;
1232        self.writer.flush()?;
1233        self.finished = true;
1234
1235        Ok(())
1236    }
1237
1238    /// Returns the arrow [`SchemaRef`] for this arrow file.
1239    pub fn schema(&self) -> &SchemaRef {
1240        &self.schema
1241    }
1242
1243    /// Gets a reference to the underlying writer.
1244    pub fn get_ref(&self) -> &W {
1245        &self.writer
1246    }
1247
1248    /// Gets a mutable reference to the underlying writer.
1249    ///
1250    /// It is inadvisable to directly write to the underlying writer.
1251    pub fn get_mut(&mut self) -> &mut W {
1252        &mut self.writer
1253    }
1254
1255    /// Flush the underlying writer.
1256    ///
1257    /// Both the BufWriter and the underlying writer are flushed.
1258    pub fn flush(&mut self) -> Result<(), ArrowError> {
1259        self.writer.flush()?;
1260        Ok(())
1261    }
1262
1263    /// Unwraps the underlying writer.
1264    ///
1265    /// The writer is flushed and the FileWriter is finished before returning.
1266    ///
1267    /// # Errors
1268    ///
1269    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1270    /// or while flushing the writer.
1271    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1272        if !self.finished {
1273            // `finish` flushes the writer.
1274            self.finish()?;
1275        }
1276        Ok(self.writer)
1277    }
1278}
1279
1280impl<W: Write> RecordBatchWriter for FileWriter<W> {
1281    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1282        self.write(batch)
1283    }
1284
1285    fn close(mut self) -> Result<(), ArrowError> {
1286        self.finish()
1287    }
1288}
1289
1290/// Arrow Stream Writer
1291///
1292/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1293///
1294/// # See Also
1295///
1296/// * [`FileWriter`] for writing IPC Files
1297///
1298/// # Example - Basic usage
1299/// ```
1300/// # use arrow_array::record_batch;
1301/// # use arrow_ipc::writer::StreamWriter;
1302/// # let mut stream = vec![]; // mimic a stream for the example
1303/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1304/// // create a new writer, the schema must be known in advance
1305/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1306/// // write each batch to the underlying stream
1307/// writer.write(&batch).unwrap();
1308/// // When all batches are written, call finish to flush all buffers
1309/// writer.finish().unwrap();
1310/// ```
1311/// # Example - Efficient delta dictionaries
1312/// ```
1313/// # use arrow_array::record_batch;
1314/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1315/// # use arrow_ipc::writer::DictionaryHandling;
1316/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1317/// # use arrow_array::{
1318/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1319/// #    RecordBatch, StringArray,
1320/// # };
1321/// # use std::sync::Arc;
1322///
1323/// let schema = Arc::new(Schema::new(vec![Field::new(
1324///    "col1",
1325///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1326///    true,
1327/// )]));
1328///
1329/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1330///
1331/// // `finish_preserve_values` will keep the dictionary values along with their
1332/// // key assignments so that they can be re-used in the next batch.
1333/// builder.append("a").unwrap();
1334/// builder.append("b").unwrap();
1335/// let array1 = builder.finish_preserve_values();
1336/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1337///
1338/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1339/// // and 'd' will take the next available key.
1340/// builder.append("a").unwrap();
1341/// builder.append("d").unwrap();
1342/// let array2 = builder.finish_preserve_values();
1343/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1344///
1345/// let mut stream = vec![];
1346/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1347/// // enable delta dictionaries in the writer
1348/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1349/// let mut writer = StreamWriter::try_new(&mut stream, &schema).unwrap();
1350///
1351/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1352/// // prior to the record batch.
1353/// writer.write(&batch1).unwrap();
1354/// // With the second batch only a delta dictionary with 'd' will be written
1355/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1356/// // Without it, 'a' and 'd' in this batch would have different keys than the
1357/// // first batch and so we'd have to send a replacement dictionary with new keys
1358/// // for both.
1359/// writer.write(&batch2).unwrap();
1360/// writer.finish().unwrap();
1361/// ```
1362/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1363pub struct StreamWriter<W> {
1364    /// The object to write to
1365    writer: W,
1366    /// IPC write options
1367    write_options: IpcWriteOptions,
1368    /// Whether the writer footer has been written, and the writer is finished
1369    finished: bool,
1370    /// Keeps track of dictionaries that have been written
1371    dictionary_tracker: DictionaryTracker,
1372
1373    data_gen: IpcDataGenerator,
1374
1375    compression_context: CompressionContext,
1376}
1377
1378impl<W: Write> StreamWriter<BufWriter<W>> {
1379    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1380    ///
1381    /// See [`StreamWriter::try_new`] for an unbuffered version.
1382    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1383        Self::try_new(BufWriter::new(writer), schema)
1384    }
1385}
1386
1387impl<W: Write> StreamWriter<W> {
1388    /// Try to create a new writer, with the schema written as part of the header.
1389    ///
1390    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1391    ///
1392    /// # Errors
1393    ///
1394    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1395    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1396        let write_options = IpcWriteOptions::default();
1397        Self::try_new_with_options(writer, schema, write_options)
1398    }
1399
1400    /// Try to create a new writer with [`IpcWriteOptions`].
1401    ///
1402    /// # Errors
1403    ///
1404    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1405    pub fn try_new_with_options(
1406        mut writer: W,
1407        schema: &Schema,
1408        write_options: IpcWriteOptions,
1409    ) -> Result<Self, ArrowError> {
1410        let data_gen = IpcDataGenerator::default();
1411        let mut dictionary_tracker = DictionaryTracker::new(false);
1412
1413        // write the schema, set the written bytes to the schema
1414        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1415            schema,
1416            &mut dictionary_tracker,
1417            &write_options,
1418        );
1419        write_message(&mut writer, encoded_message, &write_options)?;
1420        Ok(Self {
1421            writer,
1422            write_options,
1423            finished: false,
1424            dictionary_tracker,
1425            data_gen,
1426            compression_context: CompressionContext::default(),
1427        })
1428    }
1429
1430    /// Write a record batch to the stream
1431    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1432        if self.finished {
1433            return Err(ArrowError::IpcError(
1434                "Cannot write record batch to stream writer as it is closed".to_string(),
1435            ));
1436        }
1437
1438        let (encoded_dictionaries, encoded_message) = self
1439            .data_gen
1440            .encode(
1441                batch,
1442                &mut self.dictionary_tracker,
1443                &self.write_options,
1444                &mut self.compression_context,
1445            )
1446            .expect("StreamWriter is configured to not error on dictionary replacement");
1447
1448        for encoded_dictionary in encoded_dictionaries {
1449            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1450        }
1451
1452        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1453        Ok(())
1454    }
1455
1456    /// Write continuation bytes, and mark the stream as done
1457    pub fn finish(&mut self) -> Result<(), ArrowError> {
1458        if self.finished {
1459            return Err(ArrowError::IpcError(
1460                "Cannot write footer to stream writer as it is closed".to_string(),
1461            ));
1462        }
1463
1464        write_continuation(&mut self.writer, &self.write_options, 0)?;
1465
1466        self.finished = true;
1467
1468        Ok(())
1469    }
1470
1471    /// Gets a reference to the underlying writer.
1472    pub fn get_ref(&self) -> &W {
1473        &self.writer
1474    }
1475
1476    /// Gets a mutable reference to the underlying writer.
1477    ///
1478    /// It is inadvisable to directly write to the underlying writer.
1479    pub fn get_mut(&mut self) -> &mut W {
1480        &mut self.writer
1481    }
1482
1483    /// Flush the underlying writer.
1484    ///
1485    /// Both the BufWriter and the underlying writer are flushed.
1486    pub fn flush(&mut self) -> Result<(), ArrowError> {
1487        self.writer.flush()?;
1488        Ok(())
1489    }
1490
1491    /// Unwraps the the underlying writer.
1492    ///
1493    /// The writer is flushed and the StreamWriter is finished before returning.
1494    ///
1495    /// # Errors
1496    ///
1497    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1498    /// or while flushing the writer.
1499    ///
1500    /// # Example
1501    ///
1502    /// ```
1503    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1504    /// # use arrow_ipc::MetadataVersion;
1505    /// # use arrow_schema::{ArrowError, Schema};
1506    /// # fn main() -> Result<(), ArrowError> {
1507    /// // The result we expect from an empty schema
1508    /// let expected = vec![
1509    ///     255, 255, 255, 255,  48,   0,   0,   0,
1510    ///      16,   0,   0,   0,   0,   0,  10,   0,
1511    ///      12,   0,  10,   0,   9,   0,   4,   0,
1512    ///      10,   0,   0,   0,  16,   0,   0,   0,
1513    ///       0,   1,   4,   0,   8,   0,   8,   0,
1514    ///       0,   0,   4,   0,   8,   0,   0,   0,
1515    ///       4,   0,   0,   0,   0,   0,   0,   0,
1516    ///     255, 255, 255, 255,   0,   0,   0,   0
1517    /// ];
1518    ///
1519    /// let schema = Schema::empty();
1520    /// let buffer: Vec<u8> = Vec::new();
1521    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1522    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1523    ///
1524    /// assert_eq!(stream_writer.into_inner()?, expected);
1525    /// # Ok(())
1526    /// # }
1527    /// ```
1528    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1529        if !self.finished {
1530            // `finish` flushes.
1531            self.finish()?;
1532        }
1533        Ok(self.writer)
1534    }
1535}
1536
1537impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1538    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1539        self.write(batch)
1540    }
1541
1542    fn close(mut self) -> Result<(), ArrowError> {
1543        self.finish()
1544    }
1545}
1546
1547/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1548pub struct EncodedData {
1549    /// An encoded crate::Message
1550    pub ipc_message: Vec<u8>,
1551    /// Arrow buffers to be written, should be an empty vec for schema messages
1552    pub arrow_data: Vec<u8>,
1553}
1554/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1555pub fn write_message<W: Write>(
1556    mut writer: W,
1557    encoded: EncodedData,
1558    write_options: &IpcWriteOptions,
1559) -> Result<(usize, usize), ArrowError> {
1560    let arrow_data_len = encoded.arrow_data.len();
1561    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1562        return Err(ArrowError::MemoryError(
1563            "Arrow data not aligned".to_string(),
1564        ));
1565    }
1566
1567    let a = usize::from(write_options.alignment - 1);
1568    let buffer = encoded.ipc_message;
1569    let flatbuf_size = buffer.len();
1570    let prefix_size = if write_options.write_legacy_ipc_format {
1571        4
1572    } else {
1573        8
1574    };
1575    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1576    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1577
1578    write_continuation(
1579        &mut writer,
1580        write_options,
1581        (aligned_size - prefix_size) as i32,
1582    )?;
1583
1584    // write the flatbuf
1585    if flatbuf_size > 0 {
1586        writer.write_all(&buffer)?;
1587    }
1588    // write padding
1589    writer.write_all(&PADDING[..padding_bytes])?;
1590
1591    // write arrow data
1592    let body_len = if arrow_data_len > 0 {
1593        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1594    } else {
1595        0
1596    };
1597
1598    Ok((aligned_size, body_len))
1599}
1600
1601fn write_body_buffers<W: Write>(
1602    mut writer: W,
1603    data: &[u8],
1604    alignment: u8,
1605) -> Result<usize, ArrowError> {
1606    let len = data.len();
1607    let pad_len = pad_to_alignment(alignment, len);
1608    let total_len = len + pad_len;
1609
1610    // write body buffer
1611    writer.write_all(data)?;
1612    if pad_len > 0 {
1613        writer.write_all(&PADDING[..pad_len])?;
1614    }
1615
1616    writer.flush()?;
1617    Ok(total_len)
1618}
1619
1620/// Write a record batch to the writer, writing the message size before the message
1621/// if the record batch is being written to a stream
1622fn write_continuation<W: Write>(
1623    mut writer: W,
1624    write_options: &IpcWriteOptions,
1625    total_len: i32,
1626) -> Result<usize, ArrowError> {
1627    let mut written = 8;
1628
1629    // the version of the writer determines whether continuation markers should be added
1630    match write_options.metadata_version {
1631        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1632            unreachable!("Options with the metadata version cannot be created")
1633        }
1634        crate::MetadataVersion::V4 => {
1635            if !write_options.write_legacy_ipc_format {
1636                // v0.15.0 format
1637                writer.write_all(&CONTINUATION_MARKER)?;
1638                written = 4;
1639            }
1640            writer.write_all(&total_len.to_le_bytes()[..])?;
1641        }
1642        crate::MetadataVersion::V5 => {
1643            // write continuation marker and message length
1644            writer.write_all(&CONTINUATION_MARKER)?;
1645            writer.write_all(&total_len.to_le_bytes()[..])?;
1646        }
1647        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1648    };
1649
1650    writer.flush()?;
1651
1652    Ok(written)
1653}
1654
1655/// In V4, null types have no validity bitmap
1656/// In V5 and later, null and union types have no validity bitmap
1657/// Run end encoded type has no validity bitmap.
1658fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1659    if write_options.metadata_version < crate::MetadataVersion::V5 {
1660        !matches!(data_type, DataType::Null)
1661    } else {
1662        !matches!(
1663            data_type,
1664            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1665        )
1666    }
1667}
1668
1669/// Whether to truncate the buffer
1670#[inline]
1671fn buffer_need_truncate(
1672    array_offset: usize,
1673    buffer: &Buffer,
1674    spec: &BufferSpec,
1675    min_length: usize,
1676) -> bool {
1677    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1678}
1679
1680/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1681#[inline]
1682fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1683    match spec {
1684        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1685        _ => 0,
1686    }
1687}
1688
1689/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1690/// original start offset and length for use in slicing child data.
1691fn reencode_offsets<O: OffsetSizeTrait>(
1692    offsets: &Buffer,
1693    data: &ArrayData,
1694) -> (Buffer, usize, usize) {
1695    let offsets_slice: &[O] = offsets.typed_data::<O>();
1696    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1697
1698    let start_offset = offset_slice.first().unwrap();
1699    let end_offset = offset_slice.last().unwrap();
1700
1701    let offsets = match start_offset.as_usize() {
1702        0 => {
1703            let size = size_of::<O>();
1704            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1705        }
1706        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1707    };
1708
1709    let start_offset = start_offset.as_usize();
1710    let end_offset = end_offset.as_usize();
1711
1712    (offsets, start_offset, end_offset - start_offset)
1713}
1714
1715/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1716///
1717/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1718/// slicing the values buffer as appropriate. This helps reduce the encoded
1719/// size of sliced arrays, as values that have been sliced away are not encoded
1720fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1721    if data.is_empty() {
1722        return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1723    }
1724
1725    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1726    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1727    (offsets, values)
1728}
1729
1730/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1731/// of a values buffer.
1732fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1733    if data.is_empty() {
1734        return (
1735            MutableBuffer::new(0).into(),
1736            data.child_data()[0].slice(0, 0),
1737        );
1738    }
1739
1740    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1741    let child_data = data.child_data()[0].slice(original_start_offset, len);
1742    (offsets, child_data)
1743}
1744
1745/// Returns the offsets, sizes, and child data buffers for a ListView array.
1746///
1747/// Unlike List arrays, ListView arrays store both offsets and sizes explicitly,
1748/// and offsets can be non-monotonic. When slicing, we simply pass through the
1749/// offsets and sizes without re-encoding, and do not slice the child data.
1750fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1751    data: &ArrayData,
1752) -> (Buffer, Buffer, ArrayData) {
1753    if data.is_empty() {
1754        return (
1755            MutableBuffer::new(0).into(),
1756            MutableBuffer::new(0).into(),
1757            data.child_data()[0].slice(0, 0),
1758        );
1759    }
1760
1761    let offsets = &data.buffers()[0];
1762    let sizes = &data.buffers()[1];
1763
1764    let element_size = std::mem::size_of::<O>();
1765    let offsets_slice =
1766        offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1767    let sizes_slice =
1768        sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1769
1770    let child_data = data.child_data()[0].clone();
1771
1772    (offsets_slice, sizes_slice, child_data)
1773}
1774
1775/// Returns the sliced views [`Buffer`] for a BinaryView/Utf8View array.
1776///
1777/// The views buffer is sliced to only include views in the valid range based on
1778/// the array's offset and length. This helps reduce the encoded size of sliced
1779/// arrays
1780///
1781fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] {
1782    let buffer = &array_data.buffers()[0];
1783    let layout = layout(array_data.data_type());
1784    let spec = &layout.buffers[0];
1785
1786    let byte_width = get_buffer_element_width(spec);
1787    let min_length = array_data.len() * byte_width;
1788    if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1789        let byte_offset = array_data.offset() * byte_width;
1790        let buffer_length = min(min_length, buffer.len() - byte_offset);
1791        &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1792    } else {
1793        buffer.as_slice()
1794    }
1795}
1796
1797/// Write array data to a vector of bytes
1798#[allow(clippy::too_many_arguments)]
1799fn write_array_data(
1800    array_data: &ArrayData,
1801    buffers: &mut Vec<crate::Buffer>,
1802    arrow_data: &mut Vec<u8>,
1803    nodes: &mut Vec<crate::FieldNode>,
1804    offset: i64,
1805    num_rows: usize,
1806    null_count: usize,
1807    compression_codec: Option<CompressionCodec>,
1808    compression_context: &mut CompressionContext,
1809    write_options: &IpcWriteOptions,
1810) -> Result<i64, ArrowError> {
1811    let mut offset = offset;
1812    if !matches!(array_data.data_type(), DataType::Null) {
1813        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1814    } else {
1815        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1816        // where null_count is always 0.
1817        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1818    }
1819    if has_validity_bitmap(array_data.data_type(), write_options) {
1820        // write null buffer if exists
1821        let null_buffer = match array_data.nulls() {
1822            None => {
1823                // create a buffer and fill it with valid bits
1824                let num_bytes = bit_util::ceil(num_rows, 8);
1825                let buffer = MutableBuffer::new(num_bytes);
1826                let buffer = buffer.with_bitset(num_bytes, true);
1827                buffer.into()
1828            }
1829            Some(buffer) => buffer.inner().sliced(),
1830        };
1831
1832        offset = write_buffer(
1833            null_buffer.as_slice(),
1834            buffers,
1835            arrow_data,
1836            offset,
1837            compression_codec,
1838            compression_context,
1839            write_options.alignment,
1840        )?;
1841    }
1842
1843    let data_type = array_data.data_type();
1844    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1845        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1846        for buffer in [offsets, values] {
1847            offset = write_buffer(
1848                buffer.as_slice(),
1849                buffers,
1850                arrow_data,
1851                offset,
1852                compression_codec,
1853                compression_context,
1854                write_options.alignment,
1855            )?;
1856        }
1857    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1858        // Slicing the views buffer is safe and easy,
1859        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1860        //
1861        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1862        // If users wants to "compact" the arrays prior to sending them over IPC,
1863        // they should consider the gc API suggested in #5513
1864        let views = get_or_truncate_buffer(array_data);
1865        offset = write_buffer(
1866            views,
1867            buffers,
1868            arrow_data,
1869            offset,
1870            compression_codec,
1871            compression_context,
1872            write_options.alignment,
1873        )?;
1874
1875        for buffer in array_data.buffers().iter().skip(1) {
1876            offset = write_buffer(
1877                buffer.as_slice(),
1878                buffers,
1879                arrow_data,
1880                offset,
1881                compression_codec,
1882                compression_context,
1883                write_options.alignment,
1884            )?;
1885        }
1886    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1887        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1888        for buffer in [offsets, values] {
1889            offset = write_buffer(
1890                buffer.as_slice(),
1891                buffers,
1892                arrow_data,
1893                offset,
1894                compression_codec,
1895                compression_context,
1896                write_options.alignment,
1897            )?;
1898        }
1899    } else if DataType::is_numeric(data_type)
1900        || DataType::is_temporal(data_type)
1901        || matches!(
1902            array_data.data_type(),
1903            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1904        )
1905    {
1906        // Truncate values
1907        assert_eq!(array_data.buffers().len(), 1);
1908
1909        let buffer = get_or_truncate_buffer(array_data);
1910        offset = write_buffer(
1911            buffer,
1912            buffers,
1913            arrow_data,
1914            offset,
1915            compression_codec,
1916            compression_context,
1917            write_options.alignment,
1918        )?;
1919    } else if matches!(data_type, DataType::Boolean) {
1920        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1921        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1922        assert_eq!(array_data.buffers().len(), 1);
1923
1924        let buffer = &array_data.buffers()[0];
1925        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1926        offset = write_buffer(
1927            &buffer,
1928            buffers,
1929            arrow_data,
1930            offset,
1931            compression_codec,
1932            compression_context,
1933            write_options.alignment,
1934        )?;
1935    } else if matches!(
1936        data_type,
1937        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1938    ) {
1939        assert_eq!(array_data.buffers().len(), 1);
1940        assert_eq!(array_data.child_data().len(), 1);
1941
1942        // Truncate offsets and the child data to avoid writing unnecessary data
1943        let (offsets, sliced_child_data) = match data_type {
1944            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1945            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1946            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1947            _ => unreachable!(),
1948        };
1949        offset = write_buffer(
1950            offsets.as_slice(),
1951            buffers,
1952            arrow_data,
1953            offset,
1954            compression_codec,
1955            compression_context,
1956            write_options.alignment,
1957        )?;
1958        offset = write_array_data(
1959            &sliced_child_data,
1960            buffers,
1961            arrow_data,
1962            nodes,
1963            offset,
1964            sliced_child_data.len(),
1965            sliced_child_data.null_count(),
1966            compression_codec,
1967            compression_context,
1968            write_options,
1969        )?;
1970        return Ok(offset);
1971    } else if matches!(
1972        data_type,
1973        DataType::ListView(_) | DataType::LargeListView(_)
1974    ) {
1975        assert_eq!(array_data.buffers().len(), 2); // offsets + sizes
1976        assert_eq!(array_data.child_data().len(), 1);
1977
1978        let (offsets, sizes, child_data) = match data_type {
1979            DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
1980            DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
1981            _ => unreachable!(),
1982        };
1983
1984        offset = write_buffer(
1985            offsets.as_slice(),
1986            buffers,
1987            arrow_data,
1988            offset,
1989            compression_codec,
1990            compression_context,
1991            write_options.alignment,
1992        )?;
1993
1994        offset = write_buffer(
1995            sizes.as_slice(),
1996            buffers,
1997            arrow_data,
1998            offset,
1999            compression_codec,
2000            compression_context,
2001            write_options.alignment,
2002        )?;
2003
2004        offset = write_array_data(
2005            &child_data,
2006            buffers,
2007            arrow_data,
2008            nodes,
2009            offset,
2010            child_data.len(),
2011            child_data.null_count(),
2012            compression_codec,
2013            compression_context,
2014            write_options,
2015        )?;
2016        return Ok(offset);
2017    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2018        assert_eq!(array_data.child_data().len(), 1);
2019        let fixed_size = *fixed_size as usize;
2020
2021        let child_offset = array_data.offset() * fixed_size;
2022        let child_length = array_data.len() * fixed_size;
2023        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2024
2025        offset = write_array_data(
2026            &child_data,
2027            buffers,
2028            arrow_data,
2029            nodes,
2030            offset,
2031            child_data.len(),
2032            child_data.null_count(),
2033            compression_codec,
2034            compression_context,
2035            write_options,
2036        )?;
2037        return Ok(offset);
2038    } else {
2039        for buffer in array_data.buffers() {
2040            offset = write_buffer(
2041                buffer,
2042                buffers,
2043                arrow_data,
2044                offset,
2045                compression_codec,
2046                compression_context,
2047                write_options.alignment,
2048            )?;
2049        }
2050    }
2051
2052    match array_data.data_type() {
2053        DataType::Dictionary(_, _) => {}
2054        DataType::RunEndEncoded(_, _) => {
2055            // unslice the run encoded array.
2056            let arr = unslice_run_array(array_data.clone())?;
2057            // recursively write out nested structures
2058            for data_ref in arr.child_data() {
2059                // write the nested data (e.g list data)
2060                offset = write_array_data(
2061                    data_ref,
2062                    buffers,
2063                    arrow_data,
2064                    nodes,
2065                    offset,
2066                    data_ref.len(),
2067                    data_ref.null_count(),
2068                    compression_codec,
2069                    compression_context,
2070                    write_options,
2071                )?;
2072            }
2073        }
2074        _ => {
2075            // recursively write out nested structures
2076            for data_ref in array_data.child_data() {
2077                // write the nested data (e.g list data)
2078                offset = write_array_data(
2079                    data_ref,
2080                    buffers,
2081                    arrow_data,
2082                    nodes,
2083                    offset,
2084                    data_ref.len(),
2085                    data_ref.null_count(),
2086                    compression_codec,
2087                    compression_context,
2088                    write_options,
2089                )?;
2090            }
2091        }
2092    }
2093    Ok(offset)
2094}
2095
2096/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
2097/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
2098///
2099///
2100/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
2101/// Each constituent buffer is first compressed with the indicated
2102/// compressor, and then written with the uncompressed length in the first 8
2103/// bytes as a 64-bit little-endian signed integer followed by the compressed
2104/// buffer bytes (and then padding as required by the protocol). The
2105/// uncompressed length may be set to -1 to indicate that the data that
2106/// follows is not compressed, which can be useful for cases where
2107/// compression does not yield appreciable savings.
2108fn write_buffer(
2109    buffer: &[u8],                    // input
2110    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
2111    arrow_data: &mut Vec<u8>,         // output stream
2112    offset: i64,                      // current output stream offset
2113    compression_codec: Option<CompressionCodec>,
2114    compression_context: &mut CompressionContext,
2115    alignment: u8,
2116) -> Result<i64, ArrowError> {
2117    let len: i64 = match compression_codec {
2118        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?,
2119        None => {
2120            arrow_data.extend_from_slice(buffer);
2121            buffer.len()
2122        }
2123    }
2124    .try_into()
2125    .map_err(|e| {
2126        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
2127    })?;
2128
2129    // make new index entry
2130    buffers.push(crate::Buffer::new(offset, len));
2131    // padding and make offset aligned
2132    let pad_len = pad_to_alignment(alignment, len as usize);
2133    arrow_data.extend_from_slice(&PADDING[..pad_len]);
2134
2135    Ok(offset + len + (pad_len as i64))
2136}
2137
2138const PADDING: [u8; 64] = [0; 64];
2139
2140/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
2141#[inline]
2142fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2143    let a = usize::from(alignment - 1);
2144    ((len + a) & !a) - len
2145}
2146
2147#[cfg(test)]
2148mod tests {
2149    use std::hash::Hasher;
2150    use std::io::Cursor;
2151    use std::io::Seek;
2152
2153    use arrow_array::builder::FixedSizeListBuilder;
2154    use arrow_array::builder::Float32Builder;
2155    use arrow_array::builder::Int64Builder;
2156    use arrow_array::builder::MapBuilder;
2157    use arrow_array::builder::StringViewBuilder;
2158    use arrow_array::builder::UnionBuilder;
2159    use arrow_array::builder::{
2160        GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2161    };
2162    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2163    use arrow_array::types::*;
2164    use arrow_buffer::ScalarBuffer;
2165
2166    use crate::MetadataVersion;
2167    use crate::convert::fb_to_schema;
2168    use crate::reader::*;
2169    use crate::root_as_footer;
2170
2171    use super::*;
2172
2173    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2174        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2175        writer.write(rb).unwrap();
2176        writer.finish().unwrap();
2177        writer.into_inner().unwrap()
2178    }
2179
2180    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2181        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2182        reader.next().unwrap().unwrap()
2183    }
2184
2185    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2186        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
2187        // without needing to construct a giant array to spill over the 64-byte default alignment
2188        // boundary.
2189        const IPC_ALIGNMENT: usize = 8;
2190
2191        let mut stream_writer = StreamWriter::try_new_with_options(
2192            vec![],
2193            record.schema_ref(),
2194            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2195        )
2196        .unwrap();
2197        stream_writer.write(record).unwrap();
2198        stream_writer.finish().unwrap();
2199        stream_writer.into_inner().unwrap()
2200    }
2201
2202    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2203        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2204        stream_reader.next().unwrap().unwrap()
2205    }
2206
2207    #[test]
2208    #[cfg(feature = "lz4")]
2209    fn test_write_empty_record_batch_lz4_compression() {
2210        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2211        let values: Vec<Option<i32>> = vec![];
2212        let array = Int32Array::from(values);
2213        let record_batch =
2214            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2215
2216        let mut file = tempfile::tempfile().unwrap();
2217
2218        {
2219            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2220                .unwrap()
2221                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2222                .unwrap();
2223
2224            let mut writer =
2225                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2226            writer.write(&record_batch).unwrap();
2227            writer.finish().unwrap();
2228        }
2229        file.rewind().unwrap();
2230        {
2231            // read file
2232            let reader = FileReader::try_new(file, None).unwrap();
2233            for read_batch in reader {
2234                read_batch
2235                    .unwrap()
2236                    .columns()
2237                    .iter()
2238                    .zip(record_batch.columns())
2239                    .for_each(|(a, b)| {
2240                        assert_eq!(a.data_type(), b.data_type());
2241                        assert_eq!(a.len(), b.len());
2242                        assert_eq!(a.null_count(), b.null_count());
2243                    });
2244            }
2245        }
2246    }
2247
2248    #[test]
2249    #[cfg(feature = "lz4")]
2250    fn test_write_file_with_lz4_compression() {
2251        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2252        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2253        let array = Int32Array::from(values);
2254        let record_batch =
2255            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2256
2257        let mut file = tempfile::tempfile().unwrap();
2258        {
2259            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2260                .unwrap()
2261                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2262                .unwrap();
2263
2264            let mut writer =
2265                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2266            writer.write(&record_batch).unwrap();
2267            writer.finish().unwrap();
2268        }
2269        file.rewind().unwrap();
2270        {
2271            // read file
2272            let reader = FileReader::try_new(file, None).unwrap();
2273            for read_batch in reader {
2274                read_batch
2275                    .unwrap()
2276                    .columns()
2277                    .iter()
2278                    .zip(record_batch.columns())
2279                    .for_each(|(a, b)| {
2280                        assert_eq!(a.data_type(), b.data_type());
2281                        assert_eq!(a.len(), b.len());
2282                        assert_eq!(a.null_count(), b.null_count());
2283                    });
2284            }
2285        }
2286    }
2287
2288    #[test]
2289    #[cfg(feature = "zstd")]
2290    fn test_write_file_with_zstd_compression() {
2291        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2292        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2293        let array = Int32Array::from(values);
2294        let record_batch =
2295            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2296        let mut file = tempfile::tempfile().unwrap();
2297        {
2298            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2299                .unwrap()
2300                .try_with_compression(Some(crate::CompressionType::ZSTD))
2301                .unwrap();
2302
2303            let mut writer =
2304                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2305            writer.write(&record_batch).unwrap();
2306            writer.finish().unwrap();
2307        }
2308        file.rewind().unwrap();
2309        {
2310            // read file
2311            let reader = FileReader::try_new(file, None).unwrap();
2312            for read_batch in reader {
2313                read_batch
2314                    .unwrap()
2315                    .columns()
2316                    .iter()
2317                    .zip(record_batch.columns())
2318                    .for_each(|(a, b)| {
2319                        assert_eq!(a.data_type(), b.data_type());
2320                        assert_eq!(a.len(), b.len());
2321                        assert_eq!(a.null_count(), b.null_count());
2322                    });
2323            }
2324        }
2325    }
2326
2327    #[test]
2328    fn test_write_file() {
2329        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2330        let values: Vec<Option<u32>> = vec![
2331            Some(999),
2332            None,
2333            Some(235),
2334            Some(123),
2335            None,
2336            None,
2337            None,
2338            None,
2339            None,
2340        ];
2341        let array1 = UInt32Array::from(values);
2342        let batch =
2343            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2344                .unwrap();
2345        let mut file = tempfile::tempfile().unwrap();
2346        {
2347            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2348
2349            writer.write(&batch).unwrap();
2350            writer.finish().unwrap();
2351        }
2352        file.rewind().unwrap();
2353
2354        {
2355            let mut reader = FileReader::try_new(file, None).unwrap();
2356            while let Some(Ok(read_batch)) = reader.next() {
2357                read_batch
2358                    .columns()
2359                    .iter()
2360                    .zip(batch.columns())
2361                    .for_each(|(a, b)| {
2362                        assert_eq!(a.data_type(), b.data_type());
2363                        assert_eq!(a.len(), b.len());
2364                        assert_eq!(a.null_count(), b.null_count());
2365                    });
2366            }
2367        }
2368    }
2369
2370    fn write_null_file(options: IpcWriteOptions) {
2371        let schema = Schema::new(vec![
2372            Field::new("nulls", DataType::Null, true),
2373            Field::new("int32s", DataType::Int32, false),
2374            Field::new("nulls2", DataType::Null, true),
2375            Field::new("f64s", DataType::Float64, false),
2376        ]);
2377        let array1 = NullArray::new(32);
2378        let array2 = Int32Array::from(vec![1; 32]);
2379        let array3 = NullArray::new(32);
2380        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2381        let batch = RecordBatch::try_new(
2382            Arc::new(schema.clone()),
2383            vec![
2384                Arc::new(array1) as ArrayRef,
2385                Arc::new(array2) as ArrayRef,
2386                Arc::new(array3) as ArrayRef,
2387                Arc::new(array4) as ArrayRef,
2388            ],
2389        )
2390        .unwrap();
2391        let mut file = tempfile::tempfile().unwrap();
2392        {
2393            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2394
2395            writer.write(&batch).unwrap();
2396            writer.finish().unwrap();
2397        }
2398
2399        file.rewind().unwrap();
2400
2401        {
2402            let reader = FileReader::try_new(file, None).unwrap();
2403            reader.for_each(|maybe_batch| {
2404                maybe_batch
2405                    .unwrap()
2406                    .columns()
2407                    .iter()
2408                    .zip(batch.columns())
2409                    .for_each(|(a, b)| {
2410                        assert_eq!(a.data_type(), b.data_type());
2411                        assert_eq!(a.len(), b.len());
2412                        assert_eq!(a.null_count(), b.null_count());
2413                    });
2414            });
2415        }
2416    }
2417    #[test]
2418    fn test_write_null_file_v4() {
2419        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2420        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2421        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2422        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2423    }
2424
2425    #[test]
2426    fn test_write_null_file_v5() {
2427        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2428        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2429    }
2430
2431    #[test]
2432    fn track_union_nested_dict() {
2433        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2434
2435        let array = Arc::new(inner) as ArrayRef;
2436
2437        // Dict field with id 2
2438        #[allow(deprecated)]
2439        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2440        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2441
2442        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2443        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2444
2445        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2446
2447        let schema = Arc::new(Schema::new(vec![Field::new(
2448            "union",
2449            union.data_type().clone(),
2450            false,
2451        )]));
2452
2453        let r#gen = IpcDataGenerator::default();
2454        let mut dict_tracker = DictionaryTracker::new(false);
2455        r#gen.schema_to_bytes_with_dictionary_tracker(
2456            &schema,
2457            &mut dict_tracker,
2458            &IpcWriteOptions::default(),
2459        );
2460
2461        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2462
2463        r#gen
2464            .encode(
2465                &batch,
2466                &mut dict_tracker,
2467                &Default::default(),
2468                &mut Default::default(),
2469            )
2470            .unwrap();
2471
2472        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2473        // so we expect the dict will be keyed to 0
2474        assert!(dict_tracker.written.contains_key(&0));
2475    }
2476
2477    #[test]
2478    fn track_struct_nested_dict() {
2479        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2480
2481        let array = Arc::new(inner) as ArrayRef;
2482
2483        // Dict field with id 2
2484        #[allow(deprecated)]
2485        let dctfield = Arc::new(Field::new_dict(
2486            "dict",
2487            array.data_type().clone(),
2488            false,
2489            2,
2490            false,
2491        ));
2492
2493        let s = StructArray::from(vec![(dctfield, array)]);
2494        let struct_array = Arc::new(s) as ArrayRef;
2495
2496        let schema = Arc::new(Schema::new(vec![Field::new(
2497            "struct",
2498            struct_array.data_type().clone(),
2499            false,
2500        )]));
2501
2502        let r#gen = IpcDataGenerator::default();
2503        let mut dict_tracker = DictionaryTracker::new(false);
2504        r#gen.schema_to_bytes_with_dictionary_tracker(
2505            &schema,
2506            &mut dict_tracker,
2507            &IpcWriteOptions::default(),
2508        );
2509
2510        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2511
2512        r#gen
2513            .encode(
2514                &batch,
2515                &mut dict_tracker,
2516                &Default::default(),
2517                &mut Default::default(),
2518            )
2519            .unwrap();
2520
2521        assert!(dict_tracker.written.contains_key(&0));
2522    }
2523
2524    fn write_union_file(options: IpcWriteOptions) {
2525        let schema = Schema::new(vec![Field::new_union(
2526            "union",
2527            vec![0, 1],
2528            vec![
2529                Field::new("a", DataType::Int32, false),
2530                Field::new("c", DataType::Float64, false),
2531            ],
2532            UnionMode::Sparse,
2533        )]);
2534        let mut builder = UnionBuilder::with_capacity_sparse(5);
2535        builder.append::<Int32Type>("a", 1).unwrap();
2536        builder.append_null::<Int32Type>("a").unwrap();
2537        builder.append::<Float64Type>("c", 3.0).unwrap();
2538        builder.append_null::<Float64Type>("c").unwrap();
2539        builder.append::<Int32Type>("a", 4).unwrap();
2540        let union = builder.build().unwrap();
2541
2542        let batch =
2543            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2544                .unwrap();
2545
2546        let mut file = tempfile::tempfile().unwrap();
2547        {
2548            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2549
2550            writer.write(&batch).unwrap();
2551            writer.finish().unwrap();
2552        }
2553        file.rewind().unwrap();
2554
2555        {
2556            let reader = FileReader::try_new(file, None).unwrap();
2557            reader.for_each(|maybe_batch| {
2558                maybe_batch
2559                    .unwrap()
2560                    .columns()
2561                    .iter()
2562                    .zip(batch.columns())
2563                    .for_each(|(a, b)| {
2564                        assert_eq!(a.data_type(), b.data_type());
2565                        assert_eq!(a.len(), b.len());
2566                        assert_eq!(a.null_count(), b.null_count());
2567                    });
2568            });
2569        }
2570    }
2571
2572    #[test]
2573    fn test_write_union_file_v4_v5() {
2574        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2575        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2576    }
2577
2578    #[test]
2579    fn test_write_view_types() {
2580        const LONG_TEST_STRING: &str =
2581            "This is a long string to make sure binary view array handles it";
2582        let schema = Schema::new(vec![
2583            Field::new("field1", DataType::BinaryView, true),
2584            Field::new("field2", DataType::Utf8View, true),
2585        ]);
2586        let values: Vec<Option<&[u8]>> = vec![
2587            Some(b"foo"),
2588            Some(b"bar"),
2589            Some(LONG_TEST_STRING.as_bytes()),
2590        ];
2591        let binary_array = BinaryViewArray::from_iter(values);
2592        let utf8_array =
2593            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2594        let record_batch = RecordBatch::try_new(
2595            Arc::new(schema.clone()),
2596            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2597        )
2598        .unwrap();
2599
2600        let mut file = tempfile::tempfile().unwrap();
2601        {
2602            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2603            writer.write(&record_batch).unwrap();
2604            writer.finish().unwrap();
2605        }
2606        file.rewind().unwrap();
2607        {
2608            let mut reader = FileReader::try_new(&file, None).unwrap();
2609            let read_batch = reader.next().unwrap().unwrap();
2610            read_batch
2611                .columns()
2612                .iter()
2613                .zip(record_batch.columns())
2614                .for_each(|(a, b)| {
2615                    assert_eq!(a, b);
2616                });
2617        }
2618        file.rewind().unwrap();
2619        {
2620            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2621            let read_batch = reader.next().unwrap().unwrap();
2622            assert_eq!(read_batch.num_columns(), 1);
2623            let read_array = read_batch.column(0);
2624            let write_array = record_batch.column(0);
2625            assert_eq!(read_array, write_array);
2626        }
2627    }
2628
2629    #[test]
2630    fn truncate_ipc_record_batch() {
2631        fn create_batch(rows: usize) -> RecordBatch {
2632            let schema = Schema::new(vec![
2633                Field::new("a", DataType::Int32, false),
2634                Field::new("b", DataType::Utf8, false),
2635            ]);
2636
2637            let a = Int32Array::from_iter_values(0..rows as i32);
2638            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2639
2640            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2641        }
2642
2643        let big_record_batch = create_batch(65536);
2644
2645        let length = 5;
2646        let small_record_batch = create_batch(length);
2647
2648        let offset = 2;
2649        let record_batch_slice = big_record_batch.slice(offset, length);
2650        assert!(
2651            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2652        );
2653        assert_eq!(
2654            serialize_stream(&small_record_batch).len(),
2655            serialize_stream(&record_batch_slice).len()
2656        );
2657
2658        assert_eq!(
2659            deserialize_stream(serialize_stream(&record_batch_slice)),
2660            record_batch_slice
2661        );
2662    }
2663
2664    #[test]
2665    fn truncate_ipc_record_batch_with_nulls() {
2666        fn create_batch() -> RecordBatch {
2667            let schema = Schema::new(vec![
2668                Field::new("a", DataType::Int32, true),
2669                Field::new("b", DataType::Utf8, true),
2670            ]);
2671
2672            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2673            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2674
2675            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2676        }
2677
2678        let record_batch = create_batch();
2679        let record_batch_slice = record_batch.slice(1, 2);
2680        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2681
2682        assert!(
2683            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2684        );
2685
2686        assert!(deserialized_batch.column(0).is_null(0));
2687        assert!(deserialized_batch.column(0).is_valid(1));
2688        assert!(deserialized_batch.column(1).is_valid(0));
2689        assert!(deserialized_batch.column(1).is_valid(1));
2690
2691        assert_eq!(record_batch_slice, deserialized_batch);
2692    }
2693
2694    #[test]
2695    fn truncate_ipc_dictionary_array() {
2696        fn create_batch() -> RecordBatch {
2697            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2698                .into_iter()
2699                .collect();
2700            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2701
2702            let array = DictionaryArray::new(keys, Arc::new(values));
2703
2704            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2705
2706            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2707        }
2708
2709        let record_batch = create_batch();
2710        let record_batch_slice = record_batch.slice(1, 2);
2711        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2712
2713        assert!(
2714            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2715        );
2716
2717        assert!(deserialized_batch.column(0).is_valid(0));
2718        assert!(deserialized_batch.column(0).is_null(1));
2719
2720        assert_eq!(record_batch_slice, deserialized_batch);
2721    }
2722
2723    #[test]
2724    fn truncate_ipc_struct_array() {
2725        fn create_batch() -> RecordBatch {
2726            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2727                .into_iter()
2728                .collect();
2729            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2730
2731            let struct_array = StructArray::from(vec![
2732                (
2733                    Arc::new(Field::new("s", DataType::Utf8, true)),
2734                    Arc::new(strings) as ArrayRef,
2735                ),
2736                (
2737                    Arc::new(Field::new("c", DataType::Int32, true)),
2738                    Arc::new(ints) as ArrayRef,
2739                ),
2740            ]);
2741
2742            let schema = Schema::new(vec![Field::new(
2743                "struct_array",
2744                struct_array.data_type().clone(),
2745                true,
2746            )]);
2747
2748            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2749        }
2750
2751        let record_batch = create_batch();
2752        let record_batch_slice = record_batch.slice(1, 2);
2753        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2754
2755        assert!(
2756            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2757        );
2758
2759        let structs = deserialized_batch
2760            .column(0)
2761            .as_any()
2762            .downcast_ref::<StructArray>()
2763            .unwrap();
2764
2765        assert!(structs.column(0).is_null(0));
2766        assert!(structs.column(0).is_valid(1));
2767        assert!(structs.column(1).is_valid(0));
2768        assert!(structs.column(1).is_null(1));
2769        assert_eq!(record_batch_slice, deserialized_batch);
2770    }
2771
2772    #[test]
2773    fn truncate_ipc_string_array_with_all_empty_string() {
2774        fn create_batch() -> RecordBatch {
2775            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2776            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2777            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2778        }
2779
2780        let record_batch = create_batch();
2781        let record_batch_slice = record_batch.slice(0, 1);
2782        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2783
2784        assert!(
2785            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2786        );
2787        assert_eq!(record_batch_slice, deserialized_batch);
2788    }
2789
2790    #[test]
2791    fn test_stream_writer_writes_array_slice() {
2792        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2793        assert_eq!(
2794            vec![Some(1), Some(2), Some(3)],
2795            array.iter().collect::<Vec<_>>()
2796        );
2797
2798        let sliced = array.slice(1, 2);
2799        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2800
2801        let batch = RecordBatch::try_new(
2802            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2803            vec![Arc::new(sliced)],
2804        )
2805        .expect("new batch");
2806
2807        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2808        writer.write(&batch).expect("write");
2809        let outbuf = writer.into_inner().expect("inner");
2810
2811        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2812        let read_batch = reader.next().unwrap().expect("read batch");
2813
2814        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2815        assert_eq!(
2816            vec![Some(2), Some(3)],
2817            read_array.iter().collect::<Vec<_>>()
2818        );
2819    }
2820
2821    #[test]
2822    fn test_large_slice_uint32() {
2823        ensure_roundtrip(Arc::new(UInt32Array::from_iter(
2824            (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
2825        )));
2826    }
2827
2828    #[test]
2829    fn test_large_slice_string() {
2830        let strings: Vec<_> = (0..8000)
2831            .map(|i| {
2832                if i % 2 == 0 {
2833                    Some(format!("value{i}"))
2834                } else {
2835                    None
2836                }
2837            })
2838            .collect();
2839
2840        ensure_roundtrip(Arc::new(StringArray::from(strings)));
2841    }
2842
2843    #[test]
2844    fn test_large_slice_string_list() {
2845        let mut ls = ListBuilder::new(StringBuilder::new());
2846
2847        let mut s = String::new();
2848        for row_number in 0..8000 {
2849            if row_number % 2 == 0 {
2850                for list_element in 0..1000 {
2851                    s.clear();
2852                    use std::fmt::Write;
2853                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2854                    ls.values().append_value(&s);
2855                }
2856                ls.append(true)
2857            } else {
2858                ls.append(false); // null
2859            }
2860        }
2861
2862        ensure_roundtrip(Arc::new(ls.finish()));
2863    }
2864
2865    #[test]
2866    fn test_large_slice_string_list_of_lists() {
2867        // The reason for the special test is to verify reencode_offsets which looks both at
2868        // the starting offset and the data offset.  So need a dataset where the starting_offset
2869        // is zero but the data offset is not.
2870        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2871
2872        for _ in 0..4000 {
2873            ls.values().append(true);
2874            ls.append(true)
2875        }
2876
2877        let mut s = String::new();
2878        for row_number in 0..4000 {
2879            if row_number % 2 == 0 {
2880                for list_element in 0..1000 {
2881                    s.clear();
2882                    use std::fmt::Write;
2883                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2884                    ls.values().values().append_value(&s);
2885                }
2886                ls.values().append(true);
2887                ls.append(true)
2888            } else {
2889                ls.append(false); // null
2890            }
2891        }
2892
2893        ensure_roundtrip(Arc::new(ls.finish()));
2894    }
2895
2896    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2897    fn ensure_roundtrip(array: ArrayRef) {
2898        let num_rows = array.len();
2899        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2900        // take off the first element
2901        let sliced_batch = orig_batch.slice(1, num_rows - 1);
2902
2903        let schema = orig_batch.schema();
2904        let stream_data = {
2905            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2906            writer.write(&sliced_batch).unwrap();
2907            writer.into_inner().unwrap()
2908        };
2909        let read_batch = {
2910            let projection = None;
2911            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2912            reader
2913                .next()
2914                .expect("expect no errors reading batch")
2915                .expect("expect batch")
2916        };
2917        assert_eq!(sliced_batch, read_batch);
2918
2919        let file_data = {
2920            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2921            writer.write(&sliced_batch).unwrap();
2922            writer.into_inner().unwrap().into_inner().unwrap()
2923        };
2924        let read_batch = {
2925            let projection = None;
2926            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2927            reader
2928                .next()
2929                .expect("expect no errors reading batch")
2930                .expect("expect batch")
2931        };
2932        assert_eq!(sliced_batch, read_batch);
2933
2934        // TODO test file writer/reader
2935    }
2936
2937    #[test]
2938    fn encode_bools_slice() {
2939        // Test case for https://github.com/apache/arrow-rs/issues/3496
2940        assert_bool_roundtrip([true, false], 1, 1);
2941
2942        // slice somewhere in the middle
2943        assert_bool_roundtrip(
2944            [
2945                true, false, true, true, false, false, true, true, true, false, false, false, true,
2946                true, true, true, false, false, false, false, true, true, true, true, true, false,
2947                false, false, false, false,
2948            ],
2949            13,
2950            17,
2951        );
2952
2953        // start at byte boundary, end in the middle
2954        assert_bool_roundtrip(
2955            [
2956                true, false, true, true, false, false, true, true, true, false, false, false,
2957            ],
2958            8,
2959            2,
2960        );
2961
2962        // start and stop and byte boundary
2963        assert_bool_roundtrip(
2964            [
2965                true, false, true, true, false, false, true, true, true, false, false, false, true,
2966                true, true, true, true, false, false, false, false, false,
2967            ],
2968            8,
2969            8,
2970        );
2971    }
2972
2973    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2974        let val_bool_field = Field::new("val", DataType::Boolean, false);
2975
2976        let schema = Arc::new(Schema::new(vec![val_bool_field]));
2977
2978        let bools = BooleanArray::from(bools.to_vec());
2979
2980        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2981        let batch = batch.slice(offset, length);
2982
2983        let data = serialize_stream(&batch);
2984        let batch2 = deserialize_stream(data);
2985        assert_eq!(batch, batch2);
2986    }
2987
2988    #[test]
2989    fn test_run_array_unslice() {
2990        let total_len = 80;
2991        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2992        let repeats: Vec<usize> = vec![3, 4, 1, 2];
2993        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2994        for ix in 0_usize..32 {
2995            let repeat: usize = repeats[ix % repeats.len()];
2996            let val: Option<i32> = vals[ix % vals.len()];
2997            input_array.resize(input_array.len() + repeat, val);
2998        }
2999
3000        // Encode the input_array to run array
3001        let mut builder =
3002            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3003        builder.extend(input_array.iter().copied());
3004        let run_array = builder.finish();
3005
3006        // test for all slice lengths.
3007        for slice_len in 1..=total_len {
3008            // test for offset = 0, slice length = slice_len
3009            let sliced_run_array: RunArray<Int16Type> =
3010                run_array.slice(0, slice_len).into_data().into();
3011
3012            // Create unsliced run array.
3013            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3014            let typed = unsliced_run_array
3015                .downcast::<PrimitiveArray<Int32Type>>()
3016                .unwrap();
3017            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3018            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3019            assert_eq!(expected, actual);
3020
3021            // test for offset = total_len - slice_len, length = slice_len
3022            let sliced_run_array: RunArray<Int16Type> = run_array
3023                .slice(total_len - slice_len, slice_len)
3024                .into_data()
3025                .into();
3026
3027            // Create unsliced run array.
3028            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3029            let typed = unsliced_run_array
3030                .downcast::<PrimitiveArray<Int32Type>>()
3031                .unwrap();
3032            let expected: Vec<Option<i32>> = input_array
3033                .iter()
3034                .skip(total_len - slice_len)
3035                .copied()
3036                .collect();
3037            let actual: Vec<Option<i32>> = typed.into_iter().collect();
3038            assert_eq!(expected, actual);
3039        }
3040    }
3041
3042    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3043        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3044
3045        for i in 0..100_000 {
3046            for value in [i, i, i] {
3047                ls.values().append_value(value);
3048            }
3049            ls.append(true)
3050        }
3051
3052        ls.finish()
3053    }
3054
3055    fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3056        let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3057
3058        for i in 0..100_000 {
3059            for value in [
3060                format!("value{}", i),
3061                format!("value{}", i),
3062                format!("value{}", i),
3063            ] {
3064                ls.values().append_value(&value);
3065            }
3066            ls.append(true)
3067        }
3068
3069        ls.finish()
3070    }
3071
3072    fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3073        let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3074
3075        for i in 0..100_000 {
3076            for value in [
3077                format!("value{}", i),
3078                format!("value{}", i),
3079                format!("value{}", i),
3080            ] {
3081                ls.values().append_value(&value);
3082            }
3083            ls.append(true)
3084        }
3085
3086        ls.finish()
3087    }
3088
3089    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3090        let mut ls =
3091            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3092
3093        for _i in 0..10_000 {
3094            for j in 0..10 {
3095                for value in [j, j, j, j] {
3096                    ls.values().values().append_value(value);
3097                }
3098                ls.values().append(true)
3099            }
3100            ls.append(true);
3101        }
3102
3103        ls.finish()
3104    }
3105
3106    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3107        let mut ls =
3108            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3109
3110        for _i in 0..999 {
3111            ls.values().append(true);
3112            ls.append(true);
3113        }
3114
3115        for j in 0..10 {
3116            for value in [j, j, j, j] {
3117                ls.values().values().append_value(value);
3118            }
3119            ls.values().append(true)
3120        }
3121        ls.append(true);
3122
3123        for i in 0..9_000 {
3124            for j in 0..10 {
3125                for value in [i + j, i + j, i + j, i + j] {
3126                    ls.values().values().append_value(value);
3127                }
3128                ls.values().append(true)
3129            }
3130            ls.append(true);
3131        }
3132
3133        ls.finish()
3134    }
3135
3136    fn generate_map_array_data() -> MapArray {
3137        let keys_builder = UInt32Builder::new();
3138        let values_builder = UInt32Builder::new();
3139
3140        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3141
3142        for i in 0..100_000 {
3143            for _j in 0..3 {
3144                builder.keys().append_value(i);
3145                builder.values().append_value(i * 2);
3146            }
3147            builder.append(true).unwrap();
3148        }
3149
3150        builder.finish()
3151    }
3152
3153    #[test]
3154    fn reencode_offsets_when_first_offset_is_not_zero() {
3155        let original_list = generate_list_data::<i32>();
3156        let original_data = original_list.into_data();
3157        let slice_data = original_data.slice(75, 7);
3158        let (new_offsets, original_start, length) =
3159            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3160        assert_eq!(
3161            vec![0, 3, 6, 9, 12, 15, 18, 21],
3162            new_offsets.typed_data::<i32>()
3163        );
3164        assert_eq!(225, original_start);
3165        assert_eq!(21, length);
3166    }
3167
3168    #[test]
3169    fn reencode_offsets_when_first_offset_is_zero() {
3170        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3171        // ls = [[], [35, 42]
3172        ls.append(true);
3173        ls.values().append_value(35);
3174        ls.values().append_value(42);
3175        ls.append(true);
3176        let original_list = ls.finish();
3177        let original_data = original_list.into_data();
3178
3179        let slice_data = original_data.slice(1, 1);
3180        let (new_offsets, original_start, length) =
3181            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3182        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3183        assert_eq!(0, original_start);
3184        assert_eq!(2, length);
3185    }
3186
3187    /// Ensure when serde full & sliced versions they are equal to original input.
3188    /// Also ensure serialized sliced version is significantly smaller than serialized full.
3189    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3190        // test both full and sliced versions
3191        let in_sliced = in_batch.slice(999, 1);
3192
3193        let bytes_batch = serialize_file(&in_batch);
3194        let bytes_sliced = serialize_file(&in_sliced);
3195
3196        // serializing 1 row should be significantly smaller than serializing 100,000
3197        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3198
3199        // ensure both are still valid and equal to originals
3200        let out_batch = deserialize_file(bytes_batch);
3201        assert_eq!(in_batch, out_batch);
3202
3203        let out_sliced = deserialize_file(bytes_sliced);
3204        assert_eq!(in_sliced, out_sliced);
3205    }
3206
3207    #[test]
3208    fn encode_lists() {
3209        let val_inner = Field::new_list_field(DataType::UInt32, true);
3210        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3211        let schema = Arc::new(Schema::new(vec![val_list_field]));
3212
3213        let values = Arc::new(generate_list_data::<i32>());
3214
3215        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3216        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3217    }
3218
3219    #[test]
3220    fn encode_empty_list() {
3221        let val_inner = Field::new_list_field(DataType::UInt32, true);
3222        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3223        let schema = Arc::new(Schema::new(vec![val_list_field]));
3224
3225        let values = Arc::new(generate_list_data::<i32>());
3226
3227        let in_batch = RecordBatch::try_new(schema, vec![values])
3228            .unwrap()
3229            .slice(999, 0);
3230        let out_batch = deserialize_file(serialize_file(&in_batch));
3231        assert_eq!(in_batch, out_batch);
3232    }
3233
3234    #[test]
3235    fn encode_large_lists() {
3236        let val_inner = Field::new_list_field(DataType::UInt32, true);
3237        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3238        let schema = Arc::new(Schema::new(vec![val_list_field]));
3239
3240        let values = Arc::new(generate_list_data::<i64>());
3241
3242        // ensure when serde full & sliced versions they are equal to original input
3243        // also ensure serialized sliced version is significantly smaller than serialized full
3244        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3245        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3246    }
3247
3248    #[test]
3249    fn encode_large_lists_non_zero_offset() {
3250        let val_inner = Field::new_list_field(DataType::UInt32, true);
3251        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3252        let schema = Arc::new(Schema::new(vec![val_list_field]));
3253
3254        let values = Arc::new(generate_list_data::<i64>());
3255
3256        check_sliced_list_array(schema, values);
3257    }
3258
3259    #[test]
3260    fn encode_large_lists_string_non_zero_offset() {
3261        let val_inner = Field::new_list_field(DataType::Utf8, true);
3262        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3263        let schema = Arc::new(Schema::new(vec![val_list_field]));
3264
3265        let values = Arc::new(generate_string_list_data::<i64>());
3266
3267        check_sliced_list_array(schema, values);
3268    }
3269
3270    #[test]
3271    fn encode_large_list_string_view_non_zero_offset() {
3272        let val_inner = Field::new_list_field(DataType::Utf8View, true);
3273        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3274        let schema = Arc::new(Schema::new(vec![val_list_field]));
3275
3276        let values = Arc::new(generate_utf8view_list_data::<i64>());
3277
3278        check_sliced_list_array(schema, values);
3279    }
3280
3281    fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3282        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3283            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3284                .unwrap()
3285                .slice(offset, len);
3286            let out_batch = deserialize_file(serialize_file(&in_batch));
3287            assert_eq!(in_batch, out_batch);
3288        }
3289    }
3290
3291    #[test]
3292    fn encode_nested_lists() {
3293        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3294        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3295        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3296        let schema = Arc::new(Schema::new(vec![list_field]));
3297
3298        let values = Arc::new(generate_nested_list_data::<i32>());
3299
3300        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3301        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3302    }
3303
3304    #[test]
3305    fn encode_nested_lists_starting_at_zero() {
3306        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3307        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3308        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3309        let schema = Arc::new(Schema::new(vec![list_field]));
3310
3311        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3312
3313        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3314        roundtrip_ensure_sliced_smaller(in_batch, 1);
3315    }
3316
3317    #[test]
3318    fn encode_map_array() {
3319        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3320        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3321        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3322        let schema = Arc::new(Schema::new(vec![map_field]));
3323
3324        let values = Arc::new(generate_map_array_data());
3325
3326        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3327        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3328    }
3329
3330    fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3331        let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3332
3333        for i in 0u32..100_000 {
3334            if i.is_multiple_of(10_000) {
3335                builder.append(false);
3336                continue;
3337            }
3338            for value in [i, i, i] {
3339                builder.values().append_value(value);
3340            }
3341            builder.append(true);
3342        }
3343
3344        builder.finish()
3345    }
3346
3347    #[test]
3348    fn encode_list_view_arrays() {
3349        let val_inner = Field::new_list_field(DataType::UInt32, true);
3350        let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3351        let schema = Arc::new(Schema::new(vec![val_field]));
3352
3353        let values = Arc::new(generate_list_view_data::<i32>());
3354
3355        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3356        let out_batch = deserialize_file(serialize_file(&in_batch));
3357        assert_eq!(in_batch, out_batch);
3358    }
3359
3360    #[test]
3361    fn encode_large_list_view_arrays() {
3362        let val_inner = Field::new_list_field(DataType::UInt32, true);
3363        let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3364        let schema = Arc::new(Schema::new(vec![val_field]));
3365
3366        let values = Arc::new(generate_list_view_data::<i64>());
3367
3368        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3369        let out_batch = deserialize_file(serialize_file(&in_batch));
3370        assert_eq!(in_batch, out_batch);
3371    }
3372
3373    #[test]
3374    fn check_sliced_list_view_array() {
3375        let inner = Field::new_list_field(DataType::UInt32, true);
3376        let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3377        let schema = Arc::new(Schema::new(vec![field]));
3378        let values = Arc::new(generate_list_view_data::<i32>());
3379
3380        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3381            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3382                .unwrap()
3383                .slice(offset, len);
3384            let out_batch = deserialize_file(serialize_file(&in_batch));
3385            assert_eq!(in_batch, out_batch);
3386        }
3387    }
3388
3389    #[test]
3390    fn check_sliced_large_list_view_array() {
3391        let inner = Field::new_list_field(DataType::UInt32, true);
3392        let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3393        let schema = Arc::new(Schema::new(vec![field]));
3394        let values = Arc::new(generate_list_view_data::<i64>());
3395
3396        for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3397            let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3398                .unwrap()
3399                .slice(offset, len);
3400            let out_batch = deserialize_file(serialize_file(&in_batch));
3401            assert_eq!(in_batch, out_batch);
3402        }
3403    }
3404
3405    fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3406        let inner_builder = UInt32Builder::new();
3407        let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3408        let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3409
3410        for i in 0u32..10_000 {
3411            if i.is_multiple_of(1_000) {
3412                outer_builder.append(false);
3413                continue;
3414            }
3415
3416            for _ in 0..3 {
3417                for value in [i, i + 1, i + 2] {
3418                    outer_builder.values().values().append_value(value);
3419                }
3420                outer_builder.values().append(true);
3421            }
3422            outer_builder.append(true);
3423        }
3424
3425        outer_builder.finish()
3426    }
3427
3428    #[test]
3429    fn encode_nested_list_views() {
3430        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3431        let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3432        let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3433        let schema = Arc::new(Schema::new(vec![list_field]));
3434
3435        let values = Arc::new(generate_nested_list_view_data::<i32>());
3436
3437        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3438        let out_batch = deserialize_file(serialize_file(&in_batch));
3439        assert_eq!(in_batch, out_batch);
3440    }
3441
3442    fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3443        list_data_type: DataType,
3444        offsets: &[U; 5],
3445        sizes: &[U; 4],
3446    ) {
3447        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3448        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3449        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3450        let dict_data = dict_array.to_data();
3451
3452        let value_offsets = Buffer::from_slice_ref(offsets);
3453        let value_sizes = Buffer::from_slice_ref(sizes);
3454
3455        let list_data = ArrayData::builder(list_data_type)
3456            .len(4)
3457            .add_buffer(value_offsets)
3458            .add_buffer(value_sizes)
3459            .add_child_data(dict_data)
3460            .build()
3461            .unwrap();
3462        let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3463
3464        let schema = Arc::new(Schema::new(vec![Field::new(
3465            "f1",
3466            list_view_array.data_type().clone(),
3467            false,
3468        )]));
3469        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3470
3471        let output_batch = deserialize_file(serialize_file(&input_batch));
3472        assert_eq!(input_batch, output_batch);
3473
3474        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3475        assert_eq!(input_batch, output_batch);
3476    }
3477
3478    #[test]
3479    fn test_roundtrip_list_view_of_dict() {
3480        #[allow(deprecated)]
3481        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3482            "item",
3483            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3484            true,
3485            1,
3486            false,
3487        )));
3488        let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3489        let sizes: &[i32; 4] = &[2, 2, 0, 3];
3490        test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3491    }
3492
3493    #[test]
3494    fn test_roundtrip_large_list_view_of_dict() {
3495        #[allow(deprecated)]
3496        let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3497            "item",
3498            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3499            true,
3500            2,
3501            false,
3502        )));
3503        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3504        let sizes: &[i64; 4] = &[2, 2, 0, 3];
3505        test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3506    }
3507
3508    #[test]
3509    fn test_roundtrip_sliced_list_view_of_dict() {
3510        #[allow(deprecated)]
3511        let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3512            "item",
3513            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3514            true,
3515            3,
3516            false,
3517        )));
3518
3519        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3520        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3521        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3522        let dict_data = dict_array.to_data();
3523
3524        let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3525        let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3526        let value_offsets = Buffer::from_slice_ref(offsets);
3527        let value_sizes = Buffer::from_slice_ref(sizes);
3528
3529        let list_data = ArrayData::builder(list_data_type)
3530            .len(6)
3531            .add_buffer(value_offsets)
3532            .add_buffer(value_sizes)
3533            .add_child_data(dict_data)
3534            .build()
3535            .unwrap();
3536        let list_view_array = GenericListViewArray::<i32>::from(list_data);
3537
3538        let schema = Arc::new(Schema::new(vec![Field::new(
3539            "f1",
3540            list_view_array.data_type().clone(),
3541            false,
3542        )]));
3543        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3544
3545        let sliced_batch = input_batch.slice(1, 4);
3546
3547        let output_batch = deserialize_file(serialize_file(&sliced_batch));
3548        assert_eq!(sliced_batch, output_batch);
3549
3550        let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3551        assert_eq!(sliced_batch, output_batch);
3552    }
3553
3554    #[test]
3555    fn test_roundtrip_dense_union_of_dict() {
3556        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3557        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3558        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3559
3560        #[allow(deprecated)]
3561        let dict_field = Arc::new(Field::new_dict(
3562            "dict",
3563            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3564            true,
3565            1,
3566            false,
3567        ));
3568        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3569        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3570
3571        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3572        let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3573
3574        let int_array = Int32Array::from(vec![100, 200]);
3575
3576        let union = UnionArray::try_new(
3577            union_fields.clone(),
3578            types,
3579            Some(offsets),
3580            vec![Arc::new(dict_array), Arc::new(int_array)],
3581        )
3582        .unwrap();
3583
3584        let schema = Arc::new(Schema::new(vec![Field::new(
3585            "union",
3586            DataType::Union(union_fields, UnionMode::Dense),
3587            false,
3588        )]));
3589        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3590
3591        let output_batch = deserialize_file(serialize_file(&input_batch));
3592        assert_eq!(input_batch, output_batch);
3593
3594        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3595        assert_eq!(input_batch, output_batch);
3596    }
3597
3598    #[test]
3599    fn test_roundtrip_sparse_union_of_dict() {
3600        let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3601        let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3602        let dict_array = DictionaryArray::new(keys, Arc::new(values));
3603
3604        #[allow(deprecated)]
3605        let dict_field = Arc::new(Field::new_dict(
3606            "dict",
3607            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3608            true,
3609            2,
3610            false,
3611        ));
3612        let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3613        let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3614
3615        let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3616
3617        let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3618
3619        let union = UnionArray::try_new(
3620            union_fields.clone(),
3621            types,
3622            None,
3623            vec![Arc::new(dict_array), Arc::new(int_array)],
3624        )
3625        .unwrap();
3626
3627        let schema = Arc::new(Schema::new(vec![Field::new(
3628            "union",
3629            DataType::Union(union_fields, UnionMode::Sparse),
3630            false,
3631        )]));
3632        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3633
3634        let output_batch = deserialize_file(serialize_file(&input_batch));
3635        assert_eq!(input_batch, output_batch);
3636
3637        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3638        assert_eq!(input_batch, output_batch);
3639    }
3640
3641    #[test]
3642    fn test_roundtrip_map_with_dict_keys() {
3643        // Building a map array is a bit involved. We first build a struct arary that has a key and
3644        // value field and then use that to build the actual map array.
3645        let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3646        let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3647        let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3648
3649        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3650
3651        #[allow(deprecated)]
3652        let entries_field = Arc::new(Field::new(
3653            "entries",
3654            DataType::Struct(
3655                vec![
3656                    Field::new_dict(
3657                        "key",
3658                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3659                        false,
3660                        1,
3661                        false,
3662                    ),
3663                    Field::new("value", DataType::Int32, true),
3664                ]
3665                .into(),
3666            ),
3667            false,
3668        ));
3669
3670        let entries = StructArray::from(vec![
3671            (
3672                Arc::new(Field::new(
3673                    "key",
3674                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3675                    false,
3676                )),
3677                Arc::new(dict_keys) as ArrayRef,
3678            ),
3679            (
3680                Arc::new(Field::new("value", DataType::Int32, true)),
3681                Arc::new(values) as ArrayRef,
3682            ),
3683        ]);
3684
3685        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3686
3687        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3688            .len(3)
3689            .add_buffer(offsets)
3690            .add_child_data(entries.into_data())
3691            .build()
3692            .unwrap();
3693        let map_array = MapArray::from(map_data);
3694
3695        let schema = Arc::new(Schema::new(vec![Field::new(
3696            "map",
3697            map_array.data_type().clone(),
3698            false,
3699        )]));
3700        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3701
3702        let output_batch = deserialize_file(serialize_file(&input_batch));
3703        assert_eq!(input_batch, output_batch);
3704
3705        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3706        assert_eq!(input_batch, output_batch);
3707    }
3708
3709    #[test]
3710    fn test_roundtrip_map_with_dict_values() {
3711        // Building a map array is a bit involved. We first build a struct arary that has a key and
3712        // value field and then use that to build the actual map array.
3713        let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3714
3715        let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3716        let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3717        let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3718
3719        #[allow(deprecated)]
3720        let entries_field = Arc::new(Field::new(
3721            "entries",
3722            DataType::Struct(
3723                vec![
3724                    Field::new("key", DataType::Utf8, false),
3725                    Field::new_dict(
3726                        "value",
3727                        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3728                        true,
3729                        2,
3730                        false,
3731                    ),
3732                ]
3733                .into(),
3734            ),
3735            false,
3736        ));
3737
3738        let entries = StructArray::from(vec![
3739            (
3740                Arc::new(Field::new("key", DataType::Utf8, false)),
3741                Arc::new(keys) as ArrayRef,
3742            ),
3743            (
3744                Arc::new(Field::new(
3745                    "value",
3746                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3747                    true,
3748                )),
3749                Arc::new(dict_values) as ArrayRef,
3750            ),
3751        ]);
3752
3753        let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3754
3755        let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3756            .len(3)
3757            .add_buffer(offsets)
3758            .add_child_data(entries.into_data())
3759            .build()
3760            .unwrap();
3761        let map_array = MapArray::from(map_data);
3762
3763        let schema = Arc::new(Schema::new(vec![Field::new(
3764            "map",
3765            map_array.data_type().clone(),
3766            false,
3767        )]));
3768        let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3769
3770        let output_batch = deserialize_file(serialize_file(&input_batch));
3771        assert_eq!(input_batch, output_batch);
3772
3773        let output_batch = deserialize_stream(serialize_stream(&input_batch));
3774        assert_eq!(input_batch, output_batch);
3775    }
3776
3777    #[test]
3778    fn test_decimal128_alignment16_is_sufficient() {
3779        const IPC_ALIGNMENT: usize = 16;
3780
3781        // Test a bunch of different dimensions to ensure alignment is never an issue.
3782        // For example, if we only test `num_cols = 1` then even with alignment 8 this
3783        // test would _happen_ to pass, even though for different dimensions like
3784        // `num_cols = 2` it would fail.
3785        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3786            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
3787
3788            let mut fields = Vec::new();
3789            let mut arrays = Vec::new();
3790            for i in 0..num_cols {
3791                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3792                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3793                fields.push(field);
3794                arrays.push(Arc::new(array) as Arc<dyn Array>);
3795            }
3796            let schema = Schema::new(fields);
3797            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3798
3799            let mut writer = FileWriter::try_new_with_options(
3800                Vec::new(),
3801                batch.schema_ref(),
3802                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3803            )
3804            .unwrap();
3805            writer.write(&batch).unwrap();
3806            writer.finish().unwrap();
3807
3808            let out: Vec<u8> = writer.into_inner().unwrap();
3809
3810            let buffer = Buffer::from_vec(out);
3811            let trailer_start = buffer.len() - 10;
3812            let footer_len =
3813                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3814            let footer =
3815                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3816
3817            let schema = fb_to_schema(footer.schema().unwrap());
3818
3819            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
3820            // for `read_record_batch` later on to read the data in a zero-copy manner.
3821            let decoder =
3822                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3823
3824            let batches = footer.recordBatches().unwrap();
3825
3826            let block = batches.get(0);
3827            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3828            let data = buffer.slice_with_length(block.offset() as _, block_len);
3829
3830            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3831
3832            assert_eq!(batch, batch2);
3833        }
3834    }
3835
3836    #[test]
3837    fn test_decimal128_alignment8_is_unaligned() {
3838        const IPC_ALIGNMENT: usize = 8;
3839
3840        let num_cols = 2;
3841        let num_rows = 1;
3842
3843        let mut fields = Vec::new();
3844        let mut arrays = Vec::new();
3845        for i in 0..num_cols {
3846            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3847            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3848            fields.push(field);
3849            arrays.push(Arc::new(array) as Arc<dyn Array>);
3850        }
3851        let schema = Schema::new(fields);
3852        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3853
3854        let mut writer = FileWriter::try_new_with_options(
3855            Vec::new(),
3856            batch.schema_ref(),
3857            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3858        )
3859        .unwrap();
3860        writer.write(&batch).unwrap();
3861        writer.finish().unwrap();
3862
3863        let out: Vec<u8> = writer.into_inner().unwrap();
3864
3865        let buffer = Buffer::from_vec(out);
3866        let trailer_start = buffer.len() - 10;
3867        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3868        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3869        let schema = fb_to_schema(footer.schema().unwrap());
3870
3871        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
3872        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
3873        let decoder =
3874            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3875
3876        let batches = footer.recordBatches().unwrap();
3877
3878        let block = batches.get(0);
3879        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3880        let data = buffer.slice_with_length(block.offset() as _, block_len);
3881
3882        let result = decoder.read_record_batch(block, &data);
3883
3884        let error = result.unwrap_err();
3885        assert_eq!(
3886            error.to_string(),
3887            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3888             offset from expected alignment of 16 by 8"
3889        );
3890    }
3891
3892    #[test]
3893    fn test_flush() {
3894        // We write a schema which is small enough to fit into a buffer and not get flushed,
3895        // and then force the write with .flush().
3896        let num_cols = 2;
3897        let mut fields = Vec::new();
3898        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3899        for i in 0..num_cols {
3900            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3901            fields.push(field);
3902        }
3903        let schema = Schema::new(fields);
3904        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3905        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3906        let mut stream_writer =
3907            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3908                .unwrap();
3909        let mut file_writer =
3910            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3911
3912        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3913        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3914        stream_writer.flush().unwrap();
3915        file_writer.flush().unwrap();
3916        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3917        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3918        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3919        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
3920        // and then a length of 0 (4 bytes) for a total of 8 bytes.
3921        // Everything before that should have been flushed in the .flush() call.
3922        let expected_stream_flushed_bytes = stream_out.len() - 8;
3923        // A file write is the same as the stream write except for the leading magic string
3924        // ARROW1 plus padding, which is 8 bytes.
3925        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3926
3927        assert!(
3928            stream_bytes_written_on_new < stream_bytes_written_on_flush,
3929            "this test makes no sense if flush is not actually required"
3930        );
3931        assert!(
3932            file_bytes_written_on_new < file_bytes_written_on_flush,
3933            "this test makes no sense if flush is not actually required"
3934        );
3935        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3936        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3937    }
3938
3939    #[test]
3940    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3941        let l1_type =
3942            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3943        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3944
3945        let l0_builder = Float32Builder::new();
3946        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3947            "item",
3948            DataType::Float32,
3949            false,
3950        )));
3951        let mut l2_builder =
3952            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3953
3954        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3955            l2_builder.values().values().append_value(point[0]);
3956            l2_builder.values().values().append_value(point[1]);
3957            l2_builder.values().values().append_value(point[2]);
3958
3959            l2_builder.values().append(true);
3960        }
3961        l2_builder.append(true);
3962
3963        let point = [10., 11., 12.];
3964        l2_builder.values().values().append_value(point[0]);
3965        l2_builder.values().values().append_value(point[1]);
3966        l2_builder.values().values().append_value(point[2]);
3967
3968        l2_builder.values().append(true);
3969        l2_builder.append(true);
3970
3971        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3972
3973        let schema = Arc::new(Schema::new_with_metadata(
3974            vec![Field::new("points", l2_type, false)],
3975            HashMap::default(),
3976        ));
3977
3978        // Test a variety of combinations that include 0 and non-zero offsets
3979        // and also portions or the rest of the array
3980        test_slices(&array, &schema, 0, 1)?;
3981        test_slices(&array, &schema, 0, 2)?;
3982        test_slices(&array, &schema, 1, 1)?;
3983
3984        Ok(())
3985    }
3986
3987    #[test]
3988    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3989        let l0_builder = Float32Builder::new();
3990        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3991        let mut l2_builder = ListBuilder::new(l1_builder);
3992
3993        for point in [
3994            [Some(1.0), Some(2.0), None],
3995            [Some(4.0), Some(5.0), Some(6.0)],
3996            [None, Some(8.0), Some(9.0)],
3997        ] {
3998            for p in point {
3999                match p {
4000                    Some(p) => l2_builder.values().values().append_value(p),
4001                    None => l2_builder.values().values().append_null(),
4002                }
4003            }
4004
4005            l2_builder.values().append(true);
4006        }
4007        l2_builder.append(true);
4008
4009        let point = [Some(10.), None, None];
4010        for p in point {
4011            match p {
4012                Some(p) => l2_builder.values().values().append_value(p),
4013                None => l2_builder.values().values().append_null(),
4014            }
4015        }
4016
4017        l2_builder.values().append(true);
4018        l2_builder.append(true);
4019
4020        let array = Arc::new(l2_builder.finish()) as ArrayRef;
4021
4022        let schema = Arc::new(Schema::new_with_metadata(
4023            vec![Field::new(
4024                "points",
4025                DataType::List(Arc::new(Field::new(
4026                    "item",
4027                    DataType::FixedSizeList(
4028                        Arc::new(Field::new("item", DataType::Float32, true)),
4029                        3,
4030                    ),
4031                    true,
4032                ))),
4033                true,
4034            )],
4035            HashMap::default(),
4036        ));
4037
4038        // Test a variety of combinations that include 0 and non-zero offsets
4039        // and also portions or the rest of the array
4040        test_slices(&array, &schema, 0, 1)?;
4041        test_slices(&array, &schema, 0, 2)?;
4042        test_slices(&array, &schema, 1, 1)?;
4043
4044        Ok(())
4045    }
4046
4047    fn test_slices(
4048        parent_array: &ArrayRef,
4049        schema: &SchemaRef,
4050        offset: usize,
4051        length: usize,
4052    ) -> Result<(), ArrowError> {
4053        let subarray = parent_array.slice(offset, length);
4054        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4055
4056        let mut bytes = Vec::new();
4057        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4058        writer.write(&original_batch)?;
4059        writer.finish()?;
4060
4061        let mut cursor = std::io::Cursor::new(bytes);
4062        let mut reader = StreamReader::try_new(&mut cursor, None)?;
4063        let returned_batch = reader.next().unwrap()?;
4064
4065        assert_eq!(original_batch, returned_batch);
4066
4067        Ok(())
4068    }
4069
4070    #[test]
4071    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4072        let int_builder = Int64Builder::new();
4073        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4074            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4075
4076        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4077            fixed_list_builder.values().append_value(point[0]);
4078            fixed_list_builder.values().append_value(point[1]);
4079            fixed_list_builder.values().append_value(point[2]);
4080
4081            fixed_list_builder.append(true);
4082        }
4083
4084        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4085
4086        let schema = Arc::new(Schema::new_with_metadata(
4087            vec![Field::new(
4088                "points",
4089                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4090                false,
4091            )],
4092            HashMap::default(),
4093        ));
4094
4095        // Test a variety of combinations that include 0 and non-zero offsets
4096        // and also portions or the rest of the array
4097        test_slices(&array, &schema, 0, 4)?;
4098        test_slices(&array, &schema, 0, 2)?;
4099        test_slices(&array, &schema, 1, 3)?;
4100        test_slices(&array, &schema, 2, 1)?;
4101
4102        Ok(())
4103    }
4104
4105    #[test]
4106    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4107        let int_builder = Int64Builder::new();
4108        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4109
4110        for point in [
4111            [Some(1), Some(2), None],
4112            [Some(4), Some(5), Some(6)],
4113            [None, Some(8), Some(9)],
4114            [Some(10), None, None],
4115        ] {
4116            for p in point {
4117                match p {
4118                    Some(p) => fixed_list_builder.values().append_value(p),
4119                    None => fixed_list_builder.values().append_null(),
4120                }
4121            }
4122
4123            fixed_list_builder.append(true);
4124        }
4125
4126        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4127
4128        let schema = Arc::new(Schema::new_with_metadata(
4129            vec![Field::new(
4130                "points",
4131                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4132                true,
4133            )],
4134            HashMap::default(),
4135        ));
4136
4137        // Test a variety of combinations that include 0 and non-zero offsets
4138        // and also portions or the rest of the array
4139        test_slices(&array, &schema, 0, 4)?;
4140        test_slices(&array, &schema, 0, 2)?;
4141        test_slices(&array, &schema, 1, 3)?;
4142        test_slices(&array, &schema, 2, 1)?;
4143
4144        Ok(())
4145    }
4146
4147    #[test]
4148    fn test_metadata_encoding_ordering() {
4149        fn create_hash() -> u64 {
4150            let metadata: HashMap<String, String> = [
4151                ("a", "1"), //
4152                ("b", "2"), //
4153                ("c", "3"), //
4154                ("d", "4"), //
4155                ("e", "5"), //
4156            ]
4157            .into_iter()
4158            .map(|(k, v)| (k.to_owned(), v.to_owned()))
4159            .collect();
4160
4161            // Set metadata on both the schema and a field within it.
4162            let schema = Arc::new(
4163                Schema::new(vec![
4164                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4165                ])
4166                .with_metadata(metadata)
4167                .clone(),
4168            );
4169            let batch = RecordBatch::new_empty(schema.clone());
4170
4171            let mut bytes = Vec::new();
4172            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4173            w.write(&batch).unwrap();
4174            w.finish().unwrap();
4175
4176            let mut h = std::hash::DefaultHasher::new();
4177            h.write(&bytes);
4178            h.finish()
4179        }
4180
4181        let expected = create_hash();
4182
4183        // Since there is randomness in the HashMap and we cannot specify our
4184        // own Hasher for the implementation used for metadata, run the above
4185        // code 20x and verify it does not change. This is not perfect but it
4186        // should be good enough.
4187        let all_passed = (0..20).all(|_| create_hash() == expected);
4188        assert!(all_passed);
4189    }
4190
4191    #[test]
4192    fn test_dictionary_tracker_reset() {
4193        let data_gen = IpcDataGenerator::default();
4194        let mut dictionary_tracker = DictionaryTracker::new(false);
4195        let writer_options = IpcWriteOptions::default();
4196        let mut compression_ctx = CompressionContext::default();
4197
4198        let schema = Arc::new(Schema::new(vec![Field::new(
4199            "a",
4200            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4201            false,
4202        )]));
4203
4204        let mut write_single_batch_stream =
4205            |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4206                let mut buffer = Vec::new();
4207
4208                // create a new IPC stream:
4209                let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4210                    &schema,
4211                    dict_tracker,
4212                    &writer_options,
4213                );
4214                _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4215
4216                let (encoded_dicts, encoded_batch) = data_gen
4217                    .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4218                    .unwrap();
4219                for encoded_dict in encoded_dicts {
4220                    _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4221                }
4222                _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4223
4224                buffer
4225            };
4226
4227        let batch1 = RecordBatch::try_new(
4228            schema.clone(),
4229            vec![Arc::new(DictionaryArray::new(
4230                UInt8Array::from_iter_values([0]),
4231                Arc::new(StringArray::from_iter_values(["a"])),
4232            ))],
4233        )
4234        .unwrap();
4235        let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4236
4237        // ensure we can read the stream back
4238        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4239        let read_batch = reader.next().unwrap().unwrap();
4240        assert_eq!(read_batch, batch1);
4241
4242        // reset the dictionary tracker so it can be used for next stream
4243        dictionary_tracker.clear();
4244
4245        // now write a 2nd stream and ensure we can also read it:
4246        let batch2 = RecordBatch::try_new(
4247            schema.clone(),
4248            vec![Arc::new(DictionaryArray::new(
4249                UInt8Array::from_iter_values([0]),
4250                Arc::new(StringArray::from_iter_values(["a"])),
4251            ))],
4252        )
4253        .unwrap();
4254        let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4255        let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4256        let read_batch = reader.next().unwrap().unwrap();
4257        assert_eq!(read_batch, batch2);
4258    }
4259}