arrow_ipc/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow IPC File and Stream Writers
19//!
20//! # Notes
21//!
22//! [`FileWriter`] and [`StreamWriter`] have similar interfaces,
23//! however the [`FileWriter`] expects a reader that supports [`Seek`]ing
24//!
25//! [`Seek`]: std::io::Seek
26
27use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
49#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51    /// Write padding after memory buffers to this multiple of bytes.
52    /// Must be 8, 16, 32, or 64 - defaults to 64.
53    alignment: u8,
54    /// The legacy format is for releases before 0.15.0, and uses metadata V4
55    write_legacy_ipc_format: bool,
56    /// The metadata version to write. The Rust IPC writer supports V4+
57    ///
58    /// *Default versions per crate*
59    ///
60    /// When creating the default IpcWriteOptions, the following metadata versions are used:
61    ///
62    /// version 2.0.0: V4, with legacy format enabled
63    /// version 4.0.0: V5
64    metadata_version: crate::MetadataVersion,
65    /// Compression, if desired. Will result in a runtime error
66    /// if the corresponding feature is not enabled
67    batch_compression_type: Option<crate::CompressionType>,
68    /// How to handle updating dictionaries in IPC messages
69    dictionary_handling: DictionaryHandling,
70}
71
72impl IpcWriteOptions {
73    /// Configures compression when writing IPC files.
74    ///
75    /// Will result in a runtime error if the corresponding feature
76    /// is not enabled
77    pub fn try_with_compression(
78        mut self,
79        batch_compression_type: Option<crate::CompressionType>,
80    ) -> Result<Self, ArrowError> {
81        self.batch_compression_type = batch_compression_type;
82
83        if self.batch_compression_type.is_some()
84            && self.metadata_version < crate::MetadataVersion::V5
85        {
86            return Err(ArrowError::InvalidArgumentError(
87                "Compression only supported in metadata v5 and above".to_string(),
88            ));
89        }
90        Ok(self)
91    }
92    /// Try to create IpcWriteOptions, checking for incompatible settings
93    pub fn try_new(
94        alignment: usize,
95        write_legacy_ipc_format: bool,
96        metadata_version: crate::MetadataVersion,
97    ) -> Result<Self, ArrowError> {
98        let is_alignment_valid =
99            alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
100        if !is_alignment_valid {
101            return Err(ArrowError::InvalidArgumentError(
102                "Alignment should be 8, 16, 32, or 64.".to_string(),
103            ));
104        }
105        let alignment: u8 = u8::try_from(alignment).expect("range already checked");
106        match metadata_version {
107            crate::MetadataVersion::V1
108            | crate::MetadataVersion::V2
109            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
110                "Writing IPC metadata version 3 and lower not supported".to_string(),
111            )),
112            #[allow(deprecated)]
113            crate::MetadataVersion::V4 => Ok(Self {
114                alignment,
115                write_legacy_ipc_format,
116                metadata_version,
117                batch_compression_type: None,
118                dictionary_handling: DictionaryHandling::default(),
119            }),
120            crate::MetadataVersion::V5 => {
121                if write_legacy_ipc_format {
122                    Err(ArrowError::InvalidArgumentError(
123                        "Legacy IPC format only supported on metadata version 4".to_string(),
124                    ))
125                } else {
126                    Ok(Self {
127                        alignment,
128                        write_legacy_ipc_format,
129                        metadata_version,
130                        batch_compression_type: None,
131                        dictionary_handling: DictionaryHandling::default(),
132                    })
133                }
134            }
135            z => Err(ArrowError::InvalidArgumentError(format!(
136                "Unsupported crate::MetadataVersion {z:?}"
137            ))),
138        }
139    }
140
141    /// Configure how dictionaries are handled in IPC messages
142    pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
143        self.dictionary_handling = dictionary_handling;
144        self
145    }
146}
147
148impl Default for IpcWriteOptions {
149    fn default() -> Self {
150        Self {
151            alignment: 64,
152            write_legacy_ipc_format: false,
153            metadata_version: crate::MetadataVersion::V5,
154            batch_compression_type: None,
155            dictionary_handling: DictionaryHandling::default(),
156        }
157    }
158}
159
160#[derive(Debug, Default)]
161/// Handles low level details of encoding [`Array`] and [`Schema`] into the
162/// [Arrow IPC Format].
163///
164/// # Example
165/// ```
166/// # fn run() {
167/// # use std::sync::Arc;
168/// # use arrow_array::UInt64Array;
169/// # use arrow_array::RecordBatch;
170/// # use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
171///
172/// // Create a record batch
173/// let batch = RecordBatch::try_from_iter(vec![
174///  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
175/// ]).unwrap();
176///
177/// // Error of dictionary ids are replaced.
178/// let error_on_replacement = true;
179/// let options = IpcWriteOptions::default();
180/// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
181///
182/// // encode the batch into zero or more encoded dictionaries
183/// // and the data for the actual array.
184/// let data_gen = IpcDataGenerator::default();
185/// let (encoded_dictionaries, encoded_message) = data_gen
186///   .encoded_batch(&batch, &mut dictionary_tracker, &options)
187///   .unwrap();
188/// # }
189/// ```
190///
191/// [Arrow IPC Format]: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
192pub struct IpcDataGenerator {}
193
194impl IpcDataGenerator {
195    /// Converts a schema to an IPC message along with `dictionary_tracker`
196    /// and returns it encoded inside [EncodedData] as a flatbuffer.
197    pub fn schema_to_bytes_with_dictionary_tracker(
198        &self,
199        schema: &Schema,
200        dictionary_tracker: &mut DictionaryTracker,
201        write_options: &IpcWriteOptions,
202    ) -> EncodedData {
203        let mut fbb = FlatBufferBuilder::new();
204        let schema = {
205            let fb = IpcSchemaEncoder::new()
206                .with_dictionary_tracker(dictionary_tracker)
207                .schema_to_fb_offset(&mut fbb, schema);
208            fb.as_union_value()
209        };
210
211        let mut message = crate::MessageBuilder::new(&mut fbb);
212        message.add_version(write_options.metadata_version);
213        message.add_header_type(crate::MessageHeader::Schema);
214        message.add_bodyLength(0);
215        message.add_header(schema);
216        // TODO: custom metadata
217        let data = message.finish();
218        fbb.finish(data, None);
219
220        let data = fbb.finished_data();
221        EncodedData {
222            ipc_message: data.to_vec(),
223            arrow_data: vec![],
224        }
225    }
226
227    fn _encode_dictionaries<I: Iterator<Item = i64>>(
228        &self,
229        column: &ArrayRef,
230        encoded_dictionaries: &mut Vec<EncodedData>,
231        dictionary_tracker: &mut DictionaryTracker,
232        write_options: &IpcWriteOptions,
233        dict_id: &mut I,
234    ) -> Result<(), ArrowError> {
235        match column.data_type() {
236            DataType::Struct(fields) => {
237                let s = as_struct_array(column);
238                for (field, column) in fields.iter().zip(s.columns()) {
239                    self.encode_dictionaries(
240                        field,
241                        column,
242                        encoded_dictionaries,
243                        dictionary_tracker,
244                        write_options,
245                        dict_id,
246                    )?;
247                }
248            }
249            DataType::RunEndEncoded(_, values) => {
250                let data = column.to_data();
251                if data.child_data().len() != 2 {
252                    return Err(ArrowError::InvalidArgumentError(format!(
253                        "The run encoded array should have exactly two child arrays. Found {}",
254                        data.child_data().len()
255                    )));
256                }
257                // The run_ends array is not expected to be dictionary encoded. Hence encode dictionaries
258                // only for values array.
259                let values_array = make_array(data.child_data()[1].clone());
260                self.encode_dictionaries(
261                    values,
262                    &values_array,
263                    encoded_dictionaries,
264                    dictionary_tracker,
265                    write_options,
266                    dict_id,
267                )?;
268            }
269            DataType::List(field) => {
270                let list = as_list_array(column);
271                self.encode_dictionaries(
272                    field,
273                    list.values(),
274                    encoded_dictionaries,
275                    dictionary_tracker,
276                    write_options,
277                    dict_id,
278                )?;
279            }
280            DataType::LargeList(field) => {
281                let list = as_large_list_array(column);
282                self.encode_dictionaries(
283                    field,
284                    list.values(),
285                    encoded_dictionaries,
286                    dictionary_tracker,
287                    write_options,
288                    dict_id,
289                )?;
290            }
291            DataType::FixedSizeList(field, _) => {
292                let list = column
293                    .as_any()
294                    .downcast_ref::<FixedSizeListArray>()
295                    .expect("Unable to downcast to fixed size list array");
296                self.encode_dictionaries(
297                    field,
298                    list.values(),
299                    encoded_dictionaries,
300                    dictionary_tracker,
301                    write_options,
302                    dict_id,
303                )?;
304            }
305            DataType::Map(field, _) => {
306                let map_array = as_map_array(column);
307
308                let (keys, values) = match field.data_type() {
309                    DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
310                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
311                };
312
313                // keys
314                self.encode_dictionaries(
315                    keys,
316                    map_array.keys(),
317                    encoded_dictionaries,
318                    dictionary_tracker,
319                    write_options,
320                    dict_id,
321                )?;
322
323                // values
324                self.encode_dictionaries(
325                    values,
326                    map_array.values(),
327                    encoded_dictionaries,
328                    dictionary_tracker,
329                    write_options,
330                    dict_id,
331                )?;
332            }
333            DataType::Union(fields, _) => {
334                let union = as_union_array(column);
335                for (type_id, field) in fields.iter() {
336                    let column = union.child(type_id);
337                    self.encode_dictionaries(
338                        field,
339                        column,
340                        encoded_dictionaries,
341                        dictionary_tracker,
342                        write_options,
343                        dict_id,
344                    )?;
345                }
346            }
347            _ => (),
348        }
349
350        Ok(())
351    }
352
353    fn encode_dictionaries<I: Iterator<Item = i64>>(
354        &self,
355        field: &Field,
356        column: &ArrayRef,
357        encoded_dictionaries: &mut Vec<EncodedData>,
358        dictionary_tracker: &mut DictionaryTracker,
359        write_options: &IpcWriteOptions,
360        dict_id_seq: &mut I,
361    ) -> Result<(), ArrowError> {
362        match column.data_type() {
363            DataType::Dictionary(_key_type, _value_type) => {
364                let dict_data = column.to_data();
365                let dict_values = &dict_data.child_data()[0];
366
367                let values = make_array(dict_data.child_data()[0].clone());
368
369                self._encode_dictionaries(
370                    &values,
371                    encoded_dictionaries,
372                    dictionary_tracker,
373                    write_options,
374                    dict_id_seq,
375                )?;
376
377                // It's important to only take the dict_id at this point, because the dict ID
378                // sequence is assigned depth-first, so we need to first encode children and have
379                // them take their assigned dict IDs before we take the dict ID for this field.
380                let dict_id = dict_id_seq.next().ok_or_else(|| {
381                    ArrowError::IpcError(format!("no dict id for field {}", field.name()))
382                })?;
383
384                match dictionary_tracker.insert_column(
385                    dict_id,
386                    column,
387                    write_options.dictionary_handling,
388                )? {
389                    DictionaryUpdate::None => {}
390                    DictionaryUpdate::New | DictionaryUpdate::Replaced => {
391                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
392                            dict_id,
393                            dict_values,
394                            write_options,
395                            false,
396                        )?);
397                    }
398                    DictionaryUpdate::Delta(data) => {
399                        encoded_dictionaries.push(self.dictionary_batch_to_bytes(
400                            dict_id,
401                            &data,
402                            write_options,
403                            true,
404                        )?);
405                    }
406                }
407            }
408            _ => self._encode_dictionaries(
409                column,
410                encoded_dictionaries,
411                dictionary_tracker,
412                write_options,
413                dict_id_seq,
414            )?,
415        }
416
417        Ok(())
418    }
419
420    /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
421    /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s  (so they are only sent once)
422    /// Make sure the [DictionaryTracker] is initialized at the start of the stream.
423    pub fn encoded_batch(
424        &self,
425        batch: &RecordBatch,
426        dictionary_tracker: &mut DictionaryTracker,
427        write_options: &IpcWriteOptions,
428    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
429        let schema = batch.schema();
430        let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
431
432        let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
433
434        for (i, field) in schema.fields().iter().enumerate() {
435            let column = batch.column(i);
436            self.encode_dictionaries(
437                field,
438                column,
439                &mut encoded_dictionaries,
440                dictionary_tracker,
441                write_options,
442                &mut dict_id,
443            )?;
444        }
445
446        let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
447        Ok((encoded_dictionaries, encoded_message))
448    }
449
450    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
451    /// other for the batch's data
452    fn record_batch_to_bytes(
453        &self,
454        batch: &RecordBatch,
455        write_options: &IpcWriteOptions,
456    ) -> Result<EncodedData, ArrowError> {
457        let mut fbb = FlatBufferBuilder::new();
458
459        let mut nodes: Vec<crate::FieldNode> = vec![];
460        let mut buffers: Vec<crate::Buffer> = vec![];
461        let mut arrow_data: Vec<u8> = vec![];
462        let mut offset = 0;
463
464        // get the type of compression
465        let batch_compression_type = write_options.batch_compression_type;
466
467        let compression = batch_compression_type.map(|batch_compression_type| {
468            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
469            c.add_method(crate::BodyCompressionMethod::BUFFER);
470            c.add_codec(batch_compression_type);
471            c.finish()
472        });
473
474        let compression_codec: Option<CompressionCodec> =
475            batch_compression_type.map(TryInto::try_into).transpose()?;
476
477        let mut variadic_buffer_counts = vec![];
478
479        for array in batch.columns() {
480            let array_data = array.to_data();
481            offset = write_array_data(
482                &array_data,
483                &mut buffers,
484                &mut arrow_data,
485                &mut nodes,
486                offset,
487                array.len(),
488                array.null_count(),
489                compression_codec,
490                write_options,
491            )?;
492
493            append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
494        }
495        // pad the tail of body data
496        let len = arrow_data.len();
497        let pad_len = pad_to_alignment(write_options.alignment, len);
498        arrow_data.extend_from_slice(&PADDING[..pad_len]);
499
500        // write data
501        let buffers = fbb.create_vector(&buffers);
502        let nodes = fbb.create_vector(&nodes);
503        let variadic_buffer = if variadic_buffer_counts.is_empty() {
504            None
505        } else {
506            Some(fbb.create_vector(&variadic_buffer_counts))
507        };
508
509        let root = {
510            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
511            batch_builder.add_length(batch.num_rows() as i64);
512            batch_builder.add_nodes(nodes);
513            batch_builder.add_buffers(buffers);
514            if let Some(c) = compression {
515                batch_builder.add_compression(c);
516            }
517
518            if let Some(v) = variadic_buffer {
519                batch_builder.add_variadicBufferCounts(v);
520            }
521            let b = batch_builder.finish();
522            b.as_union_value()
523        };
524        // create an crate::Message
525        let mut message = crate::MessageBuilder::new(&mut fbb);
526        message.add_version(write_options.metadata_version);
527        message.add_header_type(crate::MessageHeader::RecordBatch);
528        message.add_bodyLength(arrow_data.len() as i64);
529        message.add_header(root);
530        let root = message.finish();
531        fbb.finish(root, None);
532        let finished_data = fbb.finished_data();
533
534        Ok(EncodedData {
535            ipc_message: finished_data.to_vec(),
536            arrow_data,
537        })
538    }
539
540    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
541    /// other for the data
542    fn dictionary_batch_to_bytes(
543        &self,
544        dict_id: i64,
545        array_data: &ArrayData,
546        write_options: &IpcWriteOptions,
547        is_delta: bool,
548    ) -> Result<EncodedData, ArrowError> {
549        let mut fbb = FlatBufferBuilder::new();
550
551        let mut nodes: Vec<crate::FieldNode> = vec![];
552        let mut buffers: Vec<crate::Buffer> = vec![];
553        let mut arrow_data: Vec<u8> = vec![];
554
555        // get the type of compression
556        let batch_compression_type = write_options.batch_compression_type;
557
558        let compression = batch_compression_type.map(|batch_compression_type| {
559            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
560            c.add_method(crate::BodyCompressionMethod::BUFFER);
561            c.add_codec(batch_compression_type);
562            c.finish()
563        });
564
565        let compression_codec: Option<CompressionCodec> = batch_compression_type
566            .map(|batch_compression_type| batch_compression_type.try_into())
567            .transpose()?;
568
569        write_array_data(
570            array_data,
571            &mut buffers,
572            &mut arrow_data,
573            &mut nodes,
574            0,
575            array_data.len(),
576            array_data.null_count(),
577            compression_codec,
578            write_options,
579        )?;
580
581        let mut variadic_buffer_counts = vec![];
582        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
583
584        // pad the tail of body data
585        let len = arrow_data.len();
586        let pad_len = pad_to_alignment(write_options.alignment, len);
587        arrow_data.extend_from_slice(&PADDING[..pad_len]);
588
589        // write data
590        let buffers = fbb.create_vector(&buffers);
591        let nodes = fbb.create_vector(&nodes);
592        let variadic_buffer = if variadic_buffer_counts.is_empty() {
593            None
594        } else {
595            Some(fbb.create_vector(&variadic_buffer_counts))
596        };
597
598        let root = {
599            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
600            batch_builder.add_length(array_data.len() as i64);
601            batch_builder.add_nodes(nodes);
602            batch_builder.add_buffers(buffers);
603            if let Some(c) = compression {
604                batch_builder.add_compression(c);
605            }
606            if let Some(v) = variadic_buffer {
607                batch_builder.add_variadicBufferCounts(v);
608            }
609            batch_builder.finish()
610        };
611
612        let root = {
613            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
614            batch_builder.add_id(dict_id);
615            batch_builder.add_data(root);
616            batch_builder.add_isDelta(is_delta);
617            batch_builder.finish().as_union_value()
618        };
619
620        let root = {
621            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
622            message_builder.add_version(write_options.metadata_version);
623            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
624            message_builder.add_bodyLength(arrow_data.len() as i64);
625            message_builder.add_header(root);
626            message_builder.finish()
627        };
628
629        fbb.finish(root, None);
630        let finished_data = fbb.finished_data();
631
632        Ok(EncodedData {
633            ipc_message: finished_data.to_vec(),
634            arrow_data,
635        })
636    }
637}
638
639fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
640    match array.data_type() {
641        DataType::BinaryView | DataType::Utf8View => {
642            // The spec documents the counts only includes the variadic buffers, not the view/null buffers.
643            // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
644            counts.push(array.buffers().len() as i64 - 1);
645        }
646        DataType::Dictionary(_, _) => {
647            // Do nothing
648            // Dictionary types are handled in `encode_dictionaries`.
649        }
650        _ => {
651            for child in array.child_data() {
652                append_variadic_buffer_counts(counts, child)
653            }
654        }
655    }
656}
657
658pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
659    match arr.data_type() {
660        DataType::RunEndEncoded(k, _) => match k.data_type() {
661            DataType::Int16 => {
662                Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
663            }
664            DataType::Int32 => {
665                Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
666            }
667            DataType::Int64 => {
668                Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
669            }
670            d => unreachable!("Unexpected data type {d}"),
671        },
672        d => Err(ArrowError::InvalidArgumentError(format!(
673            "The given array is not a run array. Data type of given array: {d}"
674        ))),
675    }
676}
677
678// Returns a `RunArray` with zero offset and length matching the last value
679// in run_ends array.
680fn into_zero_offset_run_array<R: RunEndIndexType>(
681    run_array: RunArray<R>,
682) -> Result<RunArray<R>, ArrowError> {
683    let run_ends = run_array.run_ends();
684    if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
685        return Ok(run_array);
686    }
687
688    // The physical index of original run_ends array from which the `ArrayData`is sliced.
689    let start_physical_index = run_ends.get_start_physical_index();
690
691    // The physical index of original run_ends array until which the `ArrayData`is sliced.
692    let end_physical_index = run_ends.get_end_physical_index();
693
694    let physical_length = end_physical_index - start_physical_index + 1;
695
696    // build new run_ends array by subtracting offset from run ends.
697    let offset = R::Native::usize_as(run_ends.offset());
698    let mut builder = BufferBuilder::<R::Native>::new(physical_length);
699    for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
700        builder.append(run_end_value.sub_wrapping(offset));
701    }
702    builder.append(R::Native::from_usize(run_array.len()).unwrap());
703    let new_run_ends = unsafe {
704        // Safety:
705        // The function builds a valid run_ends array and hence need not be validated.
706        ArrayDataBuilder::new(R::DATA_TYPE)
707            .len(physical_length)
708            .add_buffer(builder.finish())
709            .build_unchecked()
710    };
711
712    // build new values by slicing physical indices.
713    let new_values = run_array
714        .values()
715        .slice(start_physical_index, physical_length)
716        .into_data();
717
718    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
719        .len(run_array.len())
720        .add_child_data(new_run_ends)
721        .add_child_data(new_values);
722    let array_data = unsafe {
723        // Safety:
724        //  This function builds a valid run array and hence can skip validation.
725        builder.build_unchecked()
726    };
727    Ok(array_data.into())
728}
729
730/// Controls how dictionaries are handled in Arrow IPC messages
731#[derive(Debug, Clone, Copy, PartialEq, Eq)]
732pub enum DictionaryHandling {
733    /// Send the entire dictionary every time it is encountered (default)
734    Resend,
735    /// Send only new dictionary values since the last batch (delta encoding)
736    ///
737    /// When a dictionary is first encountered, the entire dictionary is sent.
738    /// For subsequent batches, only values that are new (not previously sent)
739    /// are transmitted with the `isDelta` flag set to true.
740    Delta,
741}
742
743impl Default for DictionaryHandling {
744    fn default() -> Self {
745        Self::Resend
746    }
747}
748
749/// Describes what kind of update took place after a call to [`DictionaryTracker::insert`].
750#[derive(Debug, Clone)]
751pub enum DictionaryUpdate {
752    /// No dictionary was written, the dictionary was identical to what was already
753    /// in the tracker.
754    None,
755    /// No dictionary was present in the tracker
756    New,
757    /// Dictionary was replaced with the new data
758    Replaced,
759    /// Dictionary was updated, ArrayData is the delta between old and new
760    Delta(ArrayData),
761}
762
763/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary
764/// multiple times.
765///
766/// Can optionally error if an update to an existing dictionary is attempted, which
767/// isn't allowed in the `FileWriter`.
768#[derive(Debug)]
769pub struct DictionaryTracker {
770    written: HashMap<i64, ArrayData>,
771    dict_ids: Vec<i64>,
772    error_on_replacement: bool,
773}
774
775impl DictionaryTracker {
776    /// Create a new [`DictionaryTracker`].
777    ///
778    /// If `error_on_replacement`
779    /// is true, an error will be generated if an update to an
780    /// existing dictionary is attempted.
781    pub fn new(error_on_replacement: bool) -> Self {
782        #[allow(deprecated)]
783        Self {
784            written: HashMap::new(),
785            dict_ids: Vec::new(),
786            error_on_replacement,
787        }
788    }
789
790    /// Record and return the next dictionary ID.
791    pub fn next_dict_id(&mut self) -> i64 {
792        let next = self
793            .dict_ids
794            .last()
795            .copied()
796            .map(|i| i + 1)
797            .unwrap_or_default();
798
799        self.dict_ids.push(next);
800        next
801    }
802
803    /// Return the sequence of dictionary IDs in the order they should be observed while
804    /// traversing the schema
805    pub fn dict_id(&mut self) -> &[i64] {
806        &self.dict_ids
807    }
808
809    /// Keep track of the dictionary with the given ID and values. Behavior:
810    ///
811    /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate
812    ///   that the dictionary was not actually inserted (because it's already been seen).
813    /// * If this ID has been written already but with different data, and this tracker is
814    ///   configured to return an error, return an error.
815    /// * If the tracker has not been configured to error on replacement or this dictionary
816    ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
817    ///   inserted.
818    #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
819    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
820        let dict_data = column.to_data();
821        let dict_values = &dict_data.child_data()[0];
822
823        // If a dictionary with this id was already emitted, check if it was the same.
824        if let Some(last) = self.written.get(&dict_id) {
825            if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
826                // Same dictionary values => no need to emit it again
827                return Ok(false);
828            }
829            if self.error_on_replacement {
830                // If error on replacement perform a logical comparison
831                if last.child_data()[0] == *dict_values {
832                    // Same dictionary values => no need to emit it again
833                    return Ok(false);
834                }
835                return Err(ArrowError::InvalidArgumentError(
836                    "Dictionary replacement detected when writing IPC file format. \
837                     Arrow IPC files only support a single dictionary for a given field \
838                     across all batches."
839                        .to_string(),
840                ));
841            }
842        }
843
844        self.written.insert(dict_id, dict_data);
845        Ok(true)
846    }
847
848    /// Keep track of the dictionary with the given ID and values. The return
849    /// value indicates what, if any, update to the internal map took place
850    /// and how it should be interpreted based on the `dict_handling` parameter.
851    ///
852    /// # Returns
853    ///
854    /// * `Ok(Dictionary::New)` - If the dictionary was not previously written
855    /// * `Ok(Dictionary::Replaced)` - If the dictionary was previously written
856    ///   with completely different data, or if the data is a delta of the existing,
857    ///   but with `dict_handling` set to `DictionaryHandling::Resend`
858    /// * `Ok(Dictionary::Delta)` - If the dictionary was previously written, but
859    ///   the new data is a delta of the old and the `dict_handling` is set to
860    ///   `DictionaryHandling::Delta`
861    /// * `Err(e)` - If the dictionary was previously written with different data,
862    ///   and `error_on_replacement` is set to `true`.
863    pub fn insert_column(
864        &mut self,
865        dict_id: i64,
866        column: &ArrayRef,
867        dict_handling: DictionaryHandling,
868    ) -> Result<DictionaryUpdate, ArrowError> {
869        let new_data = column.to_data();
870        let new_values = &new_data.child_data()[0];
871
872        // If there is no existing dictionary with this ID, we always insert
873        let Some(old) = self.written.get(&dict_id) else {
874            self.written.insert(dict_id, new_data);
875            return Ok(DictionaryUpdate::New);
876        };
877
878        // Fast path - If the array data points to the same buffer as the
879        // existing then they're the same.
880        let old_values = &old.child_data()[0];
881        if ArrayData::ptr_eq(old_values, new_values) {
882            return Ok(DictionaryUpdate::None);
883        }
884
885        // Slow path - Compare the dictionaries value by value
886        let comparison = compare_dictionaries(old_values, new_values);
887        if matches!(comparison, DictionaryComparison::Equal) {
888            return Ok(DictionaryUpdate::None);
889        }
890
891        const REPLACEMENT_ERROR: &str =
892            "Dictionary replacement detected when writing IPC file format. \
893                 Arrow IPC files only support a single dictionary for a given field \
894                 across all batches.";
895
896        match comparison {
897            DictionaryComparison::NotEqual => {
898                if self.error_on_replacement {
899                    return Err(ArrowError::InvalidArgumentError(
900                        REPLACEMENT_ERROR.to_string(),
901                    ));
902                }
903
904                self.written.insert(dict_id, new_data);
905                Ok(DictionaryUpdate::Replaced)
906            }
907            DictionaryComparison::Delta => match dict_handling {
908                DictionaryHandling::Resend => {
909                    if self.error_on_replacement {
910                        return Err(ArrowError::InvalidArgumentError(
911                            REPLACEMENT_ERROR.to_string(),
912                        ));
913                    }
914
915                    self.written.insert(dict_id, new_data);
916                    Ok(DictionaryUpdate::Replaced)
917                }
918                DictionaryHandling::Delta => {
919                    let delta =
920                        new_values.slice(old_values.len(), new_values.len() - old_values.len());
921                    self.written.insert(dict_id, new_data);
922                    Ok(DictionaryUpdate::Delta(delta))
923                }
924            },
925            DictionaryComparison::Equal => unreachable!("Already checked equal case"),
926        }
927    }
928}
929
930/// Describes how two dictionary arrays compare to each other.
931#[derive(Debug, Clone)]
932enum DictionaryComparison {
933    /// Neither a delta, nor an exact match
934    NotEqual,
935    /// Exact element-wise match
936    Equal,
937    /// The two arrays are dictionary deltas of each other, meaning the first
938    /// is a prefix of the second.
939    Delta,
940}
941
942// Compares two dictionaries and returns a [`DictionaryComparison`].
943fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
944    // Check for exact match
945    let existing_len = old.len();
946    let new_len = new.len();
947    if existing_len == new_len {
948        if *old == *new {
949            return DictionaryComparison::Equal;
950        } else {
951            return DictionaryComparison::NotEqual;
952        }
953    }
954
955    // Can't be a delta if the new is shorter than the existing
956    if new_len < existing_len {
957        return DictionaryComparison::NotEqual;
958    }
959
960    // Check for delta
961    if new.slice(0, existing_len) == *old {
962        return DictionaryComparison::Delta;
963    }
964
965    DictionaryComparison::NotEqual
966}
967
968/// Arrow File Writer
969///
970/// Writes Arrow [`RecordBatch`]es in the [IPC File Format].
971///
972/// # See Also
973///
974/// * [`StreamWriter`] for writing IPC Streams
975///
976/// # Example
977/// ```
978/// # use arrow_array::record_batch;
979/// # use arrow_ipc::writer::FileWriter;
980/// # let mut file = vec![]; // mimic a file for the example
981/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
982/// // create a new writer, the schema must be known in advance
983/// let mut writer = FileWriter::try_new(&mut file, &batch.schema()).unwrap();
984/// // write each batch to the underlying writer
985/// writer.write(&batch).unwrap();
986/// // When all batches are written, call finish to flush all buffers
987/// writer.finish().unwrap();
988/// ```
989/// [IPC File Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
990pub struct FileWriter<W> {
991    /// The object to write to
992    writer: W,
993    /// IPC write options
994    write_options: IpcWriteOptions,
995    /// A reference to the schema, used in validating record batches
996    schema: SchemaRef,
997    /// The number of bytes between each block of bytes, as an offset for random access
998    block_offsets: usize,
999    /// Dictionary blocks that will be written as part of the IPC footer
1000    dictionary_blocks: Vec<crate::Block>,
1001    /// Record blocks that will be written as part of the IPC footer
1002    record_blocks: Vec<crate::Block>,
1003    /// Whether the writer footer has been written, and the writer is finished
1004    finished: bool,
1005    /// Keeps track of dictionaries that have been written
1006    dictionary_tracker: DictionaryTracker,
1007    /// User level customized metadata
1008    custom_metadata: HashMap<String, String>,
1009
1010    data_gen: IpcDataGenerator,
1011}
1012
1013impl<W: Write> FileWriter<BufWriter<W>> {
1014    /// Try to create a new file writer with the writer wrapped in a BufWriter.
1015    ///
1016    /// See [`FileWriter::try_new`] for an unbuffered version.
1017    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1018        Self::try_new(BufWriter::new(writer), schema)
1019    }
1020}
1021
1022impl<W: Write> FileWriter<W> {
1023    /// Try to create a new writer, with the schema written as part of the header
1024    ///
1025    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1026    ///
1027    /// # Errors
1028    ///
1029    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1030    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1031        let write_options = IpcWriteOptions::default();
1032        Self::try_new_with_options(writer, schema, write_options)
1033    }
1034
1035    /// Try to create a new writer with IpcWriteOptions
1036    ///
1037    /// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
1038    ///
1039    /// # Errors
1040    ///
1041    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1042    pub fn try_new_with_options(
1043        mut writer: W,
1044        schema: &Schema,
1045        write_options: IpcWriteOptions,
1046    ) -> Result<Self, ArrowError> {
1047        let data_gen = IpcDataGenerator::default();
1048        // write magic to header aligned on alignment boundary
1049        let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1050        let header_size = super::ARROW_MAGIC.len() + pad_len;
1051        writer.write_all(&super::ARROW_MAGIC)?;
1052        writer.write_all(&PADDING[..pad_len])?;
1053        // write the schema, set the written bytes to the schema + header
1054        let mut dictionary_tracker = DictionaryTracker::new(true);
1055        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1056            schema,
1057            &mut dictionary_tracker,
1058            &write_options,
1059        );
1060        let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1061        Ok(Self {
1062            writer,
1063            write_options,
1064            schema: Arc::new(schema.clone()),
1065            block_offsets: meta + data + header_size,
1066            dictionary_blocks: vec![],
1067            record_blocks: vec![],
1068            finished: false,
1069            dictionary_tracker,
1070            custom_metadata: HashMap::new(),
1071            data_gen,
1072        })
1073    }
1074
1075    /// Adds a key-value pair to the [FileWriter]'s custom metadata
1076    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1077        self.custom_metadata.insert(key.into(), value.into());
1078    }
1079
1080    /// Write a record batch to the file
1081    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1082        if self.finished {
1083            return Err(ArrowError::IpcError(
1084                "Cannot write record batch to file writer as it is closed".to_string(),
1085            ));
1086        }
1087
1088        let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
1089            batch,
1090            &mut self.dictionary_tracker,
1091            &self.write_options,
1092        )?;
1093
1094        for encoded_dictionary in encoded_dictionaries {
1095            let (meta, data) =
1096                write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1097
1098            let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1099            self.dictionary_blocks.push(block);
1100            self.block_offsets += meta + data;
1101        }
1102
1103        let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1104
1105        // add a record block for the footer
1106        let block = crate::Block::new(
1107            self.block_offsets as i64,
1108            meta as i32, // TODO: is this still applicable?
1109            data as i64,
1110        );
1111        self.record_blocks.push(block);
1112        self.block_offsets += meta + data;
1113        Ok(())
1114    }
1115
1116    /// Write footer and closing tag, then mark the writer as done
1117    pub fn finish(&mut self) -> Result<(), ArrowError> {
1118        if self.finished {
1119            return Err(ArrowError::IpcError(
1120                "Cannot write footer to file writer as it is closed".to_string(),
1121            ));
1122        }
1123
1124        // write EOS
1125        write_continuation(&mut self.writer, &self.write_options, 0)?;
1126
1127        let mut fbb = FlatBufferBuilder::new();
1128        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1129        let record_batches = fbb.create_vector(&self.record_blocks);
1130        let mut dictionary_tracker = DictionaryTracker::new(true);
1131        let schema = IpcSchemaEncoder::new()
1132            .with_dictionary_tracker(&mut dictionary_tracker)
1133            .schema_to_fb_offset(&mut fbb, &self.schema);
1134        let fb_custom_metadata = (!self.custom_metadata.is_empty())
1135            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1136
1137        let root = {
1138            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1139            footer_builder.add_version(self.write_options.metadata_version);
1140            footer_builder.add_schema(schema);
1141            footer_builder.add_dictionaries(dictionaries);
1142            footer_builder.add_recordBatches(record_batches);
1143            if let Some(fb_custom_metadata) = fb_custom_metadata {
1144                footer_builder.add_custom_metadata(fb_custom_metadata);
1145            }
1146            footer_builder.finish()
1147        };
1148        fbb.finish(root, None);
1149        let footer_data = fbb.finished_data();
1150        self.writer.write_all(footer_data)?;
1151        self.writer
1152            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1153        self.writer.write_all(&super::ARROW_MAGIC)?;
1154        self.writer.flush()?;
1155        self.finished = true;
1156
1157        Ok(())
1158    }
1159
1160    /// Returns the arrow [`SchemaRef`] for this arrow file.
1161    pub fn schema(&self) -> &SchemaRef {
1162        &self.schema
1163    }
1164
1165    /// Gets a reference to the underlying writer.
1166    pub fn get_ref(&self) -> &W {
1167        &self.writer
1168    }
1169
1170    /// Gets a mutable reference to the underlying writer.
1171    ///
1172    /// It is inadvisable to directly write to the underlying writer.
1173    pub fn get_mut(&mut self) -> &mut W {
1174        &mut self.writer
1175    }
1176
1177    /// Flush the underlying writer.
1178    ///
1179    /// Both the BufWriter and the underlying writer are flushed.
1180    pub fn flush(&mut self) -> Result<(), ArrowError> {
1181        self.writer.flush()?;
1182        Ok(())
1183    }
1184
1185    /// Unwraps the underlying writer.
1186    ///
1187    /// The writer is flushed and the FileWriter is finished before returning.
1188    ///
1189    /// # Errors
1190    ///
1191    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1192    /// or while flushing the writer.
1193    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1194        if !self.finished {
1195            // `finish` flushes the writer.
1196            self.finish()?;
1197        }
1198        Ok(self.writer)
1199    }
1200}
1201
1202impl<W: Write> RecordBatchWriter for FileWriter<W> {
1203    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1204        self.write(batch)
1205    }
1206
1207    fn close(mut self) -> Result<(), ArrowError> {
1208        self.finish()
1209    }
1210}
1211
1212/// Arrow Stream Writer
1213///
1214/// Writes Arrow [`RecordBatch`]es to bytes using the [IPC Streaming Format].
1215///
1216/// # See Also
1217///
1218/// * [`FileWriter`] for writing IPC Files
1219///
1220/// # Example - Basic usage
1221/// ```
1222/// # use arrow_array::record_batch;
1223/// # use arrow_ipc::writer::StreamWriter;
1224/// # let mut stream = vec![]; // mimic a stream for the example
1225/// let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
1226/// // create a new writer, the schema must be known in advance
1227/// let mut writer = StreamWriter::try_new(&mut stream, &batch.schema()).unwrap();
1228/// // write each batch to the underlying stream
1229/// writer.write(&batch).unwrap();
1230/// // When all batches are written, call finish to flush all buffers
1231/// writer.finish().unwrap();
1232/// ```
1233/// # Example - Efficient delta dictionaries
1234/// ```
1235/// # use arrow_array::record_batch;
1236/// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1237/// # use arrow_ipc::writer::DictionaryHandling;
1238/// # use arrow_schema::{DataType, Field, Schema, SchemaRef};
1239/// # use arrow_array::{
1240/// #    builder::StringDictionaryBuilder, types::Int32Type, Array, ArrayRef, DictionaryArray,
1241/// #    RecordBatch, StringArray,
1242/// # };
1243/// # use std::sync::Arc;
1244///
1245/// let schema = Arc::new(Schema::new(vec![Field::new(
1246///    "col1",
1247///    DataType::Dictionary(Box::from(DataType::Int32), Box::from(DataType::Utf8)),
1248///    true,
1249/// )]));
1250///
1251/// let mut builder = StringDictionaryBuilder::<arrow_array::types::Int32Type>::new();
1252///
1253/// // `finish_preserve_values` will keep the dictionary values along with their
1254/// // key assignments so that they can be re-used in the next batch.
1255/// builder.append("a").unwrap();
1256/// builder.append("b").unwrap();
1257/// let array1 = builder.finish_preserve_values();
1258/// let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array1) as ArrayRef]).unwrap();
1259///
1260/// // In this batch, 'a' will have the same dictionary key as 'a' in the previous batch,
1261/// // and 'd' will take the next available key.
1262/// builder.append("a").unwrap();
1263/// builder.append("d").unwrap();
1264/// let array2 = builder.finish_preserve_values();
1265/// let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(array2) as ArrayRef]).unwrap();
1266///
1267/// let mut stream = vec![];
1268/// // You must set `.with_dictionary_handling(DictionaryHandling::Delta)` to
1269/// // enable delta dictionaries in the writer
1270/// let options = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
1271/// let mut writer = StreamWriter::try_new(&mut stream, &schema).unwrap();
1272///
1273/// // When writing the first batch, a dictionary message with 'a' and 'b' will be written
1274/// // prior to the record batch.
1275/// writer.write(&batch1).unwrap();
1276/// // With the second batch only a delta dictionary with 'd' will be written
1277/// // prior to the record batch. This is only possible with `finish_preserve_values`.
1278/// // Without it, 'a' and 'd' in this batch would have different keys than the
1279/// // first batch and so we'd have to send a replacement dictionary with new keys
1280/// // for both.
1281/// writer.write(&batch2).unwrap();
1282/// writer.finish().unwrap();
1283/// ```
1284/// [IPC Streaming Format]: https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
1285pub struct StreamWriter<W> {
1286    /// The object to write to
1287    writer: W,
1288    /// IPC write options
1289    write_options: IpcWriteOptions,
1290    /// Whether the writer footer has been written, and the writer is finished
1291    finished: bool,
1292    /// Keeps track of dictionaries that have been written
1293    dictionary_tracker: DictionaryTracker,
1294
1295    data_gen: IpcDataGenerator,
1296}
1297
1298impl<W: Write> StreamWriter<BufWriter<W>> {
1299    /// Try to create a new stream writer with the writer wrapped in a BufWriter.
1300    ///
1301    /// See [`StreamWriter::try_new`] for an unbuffered version.
1302    pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1303        Self::try_new(BufWriter::new(writer), schema)
1304    }
1305}
1306
1307impl<W: Write> StreamWriter<W> {
1308    /// Try to create a new writer, with the schema written as part of the header.
1309    ///
1310    /// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
1311    ///
1312    /// # Errors
1313    ///
1314    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1315    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1316        let write_options = IpcWriteOptions::default();
1317        Self::try_new_with_options(writer, schema, write_options)
1318    }
1319
1320    /// Try to create a new writer with [`IpcWriteOptions`].
1321    ///
1322    /// # Errors
1323    ///
1324    /// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
1325    pub fn try_new_with_options(
1326        mut writer: W,
1327        schema: &Schema,
1328        write_options: IpcWriteOptions,
1329    ) -> Result<Self, ArrowError> {
1330        let data_gen = IpcDataGenerator::default();
1331        let mut dictionary_tracker = DictionaryTracker::new(false);
1332
1333        // write the schema, set the written bytes to the schema
1334        let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1335            schema,
1336            &mut dictionary_tracker,
1337            &write_options,
1338        );
1339        write_message(&mut writer, encoded_message, &write_options)?;
1340        Ok(Self {
1341            writer,
1342            write_options,
1343            finished: false,
1344            dictionary_tracker,
1345            data_gen,
1346        })
1347    }
1348
1349    /// Write a record batch to the stream
1350    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1351        if self.finished {
1352            return Err(ArrowError::IpcError(
1353                "Cannot write record batch to stream writer as it is closed".to_string(),
1354            ));
1355        }
1356
1357        let (encoded_dictionaries, encoded_message) = self
1358            .data_gen
1359            .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1360            .expect("StreamWriter is configured to not error on dictionary replacement");
1361
1362        for encoded_dictionary in encoded_dictionaries {
1363            write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1364        }
1365
1366        write_message(&mut self.writer, encoded_message, &self.write_options)?;
1367        Ok(())
1368    }
1369
1370    /// Write continuation bytes, and mark the stream as done
1371    pub fn finish(&mut self) -> Result<(), ArrowError> {
1372        if self.finished {
1373            return Err(ArrowError::IpcError(
1374                "Cannot write footer to stream writer as it is closed".to_string(),
1375            ));
1376        }
1377
1378        write_continuation(&mut self.writer, &self.write_options, 0)?;
1379
1380        self.finished = true;
1381
1382        Ok(())
1383    }
1384
1385    /// Gets a reference to the underlying writer.
1386    pub fn get_ref(&self) -> &W {
1387        &self.writer
1388    }
1389
1390    /// Gets a mutable reference to the underlying writer.
1391    ///
1392    /// It is inadvisable to directly write to the underlying writer.
1393    pub fn get_mut(&mut self) -> &mut W {
1394        &mut self.writer
1395    }
1396
1397    /// Flush the underlying writer.
1398    ///
1399    /// Both the BufWriter and the underlying writer are flushed.
1400    pub fn flush(&mut self) -> Result<(), ArrowError> {
1401        self.writer.flush()?;
1402        Ok(())
1403    }
1404
1405    /// Unwraps the the underlying writer.
1406    ///
1407    /// The writer is flushed and the StreamWriter is finished before returning.
1408    ///
1409    /// # Errors
1410    ///
1411    /// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
1412    /// or while flushing the writer.
1413    ///
1414    /// # Example
1415    ///
1416    /// ```
1417    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
1418    /// # use arrow_ipc::MetadataVersion;
1419    /// # use arrow_schema::{ArrowError, Schema};
1420    /// # fn main() -> Result<(), ArrowError> {
1421    /// // The result we expect from an empty schema
1422    /// let expected = vec![
1423    ///     255, 255, 255, 255,  48,   0,   0,   0,
1424    ///      16,   0,   0,   0,   0,   0,  10,   0,
1425    ///      12,   0,  10,   0,   9,   0,   4,   0,
1426    ///      10,   0,   0,   0,  16,   0,   0,   0,
1427    ///       0,   1,   4,   0,   8,   0,   8,   0,
1428    ///       0,   0,   4,   0,   8,   0,   0,   0,
1429    ///       4,   0,   0,   0,   0,   0,   0,   0,
1430    ///     255, 255, 255, 255,   0,   0,   0,   0
1431    /// ];
1432    ///
1433    /// let schema = Schema::empty();
1434    /// let buffer: Vec<u8> = Vec::new();
1435    /// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
1436    /// let stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?;
1437    ///
1438    /// assert_eq!(stream_writer.into_inner()?, expected);
1439    /// # Ok(())
1440    /// # }
1441    /// ```
1442    pub fn into_inner(mut self) -> Result<W, ArrowError> {
1443        if !self.finished {
1444            // `finish` flushes.
1445            self.finish()?;
1446        }
1447        Ok(self.writer)
1448    }
1449}
1450
1451impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1452    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1453        self.write(batch)
1454    }
1455
1456    fn close(mut self) -> Result<(), ArrowError> {
1457        self.finish()
1458    }
1459}
1460
1461/// Stores the encoded data, which is an crate::Message, and optional Arrow data
1462pub struct EncodedData {
1463    /// An encoded crate::Message
1464    pub ipc_message: Vec<u8>,
1465    /// Arrow buffers to be written, should be an empty vec for schema messages
1466    pub arrow_data: Vec<u8>,
1467}
1468/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
1469pub fn write_message<W: Write>(
1470    mut writer: W,
1471    encoded: EncodedData,
1472    write_options: &IpcWriteOptions,
1473) -> Result<(usize, usize), ArrowError> {
1474    let arrow_data_len = encoded.arrow_data.len();
1475    if arrow_data_len % usize::from(write_options.alignment) != 0 {
1476        return Err(ArrowError::MemoryError(
1477            "Arrow data not aligned".to_string(),
1478        ));
1479    }
1480
1481    let a = usize::from(write_options.alignment - 1);
1482    let buffer = encoded.ipc_message;
1483    let flatbuf_size = buffer.len();
1484    let prefix_size = if write_options.write_legacy_ipc_format {
1485        4
1486    } else {
1487        8
1488    };
1489    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1490    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1491
1492    write_continuation(
1493        &mut writer,
1494        write_options,
1495        (aligned_size - prefix_size) as i32,
1496    )?;
1497
1498    // write the flatbuf
1499    if flatbuf_size > 0 {
1500        writer.write_all(&buffer)?;
1501    }
1502    // write padding
1503    writer.write_all(&PADDING[..padding_bytes])?;
1504
1505    // write arrow data
1506    let body_len = if arrow_data_len > 0 {
1507        write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1508    } else {
1509        0
1510    };
1511
1512    Ok((aligned_size, body_len))
1513}
1514
1515fn write_body_buffers<W: Write>(
1516    mut writer: W,
1517    data: &[u8],
1518    alignment: u8,
1519) -> Result<usize, ArrowError> {
1520    let len = data.len();
1521    let pad_len = pad_to_alignment(alignment, len);
1522    let total_len = len + pad_len;
1523
1524    // write body buffer
1525    writer.write_all(data)?;
1526    if pad_len > 0 {
1527        writer.write_all(&PADDING[..pad_len])?;
1528    }
1529
1530    writer.flush()?;
1531    Ok(total_len)
1532}
1533
1534/// Write a record batch to the writer, writing the message size before the message
1535/// if the record batch is being written to a stream
1536fn write_continuation<W: Write>(
1537    mut writer: W,
1538    write_options: &IpcWriteOptions,
1539    total_len: i32,
1540) -> Result<usize, ArrowError> {
1541    let mut written = 8;
1542
1543    // the version of the writer determines whether continuation markers should be added
1544    match write_options.metadata_version {
1545        crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1546            unreachable!("Options with the metadata version cannot be created")
1547        }
1548        crate::MetadataVersion::V4 => {
1549            if !write_options.write_legacy_ipc_format {
1550                // v0.15.0 format
1551                writer.write_all(&CONTINUATION_MARKER)?;
1552                written = 4;
1553            }
1554            writer.write_all(&total_len.to_le_bytes()[..])?;
1555        }
1556        crate::MetadataVersion::V5 => {
1557            // write continuation marker and message length
1558            writer.write_all(&CONTINUATION_MARKER)?;
1559            writer.write_all(&total_len.to_le_bytes()[..])?;
1560        }
1561        z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1562    };
1563
1564    writer.flush()?;
1565
1566    Ok(written)
1567}
1568
1569/// In V4, null types have no validity bitmap
1570/// In V5 and later, null and union types have no validity bitmap
1571/// Run end encoded type has no validity bitmap.
1572fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1573    if write_options.metadata_version < crate::MetadataVersion::V5 {
1574        !matches!(data_type, DataType::Null)
1575    } else {
1576        !matches!(
1577            data_type,
1578            DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1579        )
1580    }
1581}
1582
1583/// Whether to truncate the buffer
1584#[inline]
1585fn buffer_need_truncate(
1586    array_offset: usize,
1587    buffer: &Buffer,
1588    spec: &BufferSpec,
1589    min_length: usize,
1590) -> bool {
1591    spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1592}
1593
1594/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
1595#[inline]
1596fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1597    match spec {
1598        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1599        _ => 0,
1600    }
1601}
1602
1603/// Common functionality for re-encoding offsets. Returns the new offsets as well as
1604/// original start offset and length for use in slicing child data.
1605fn reencode_offsets<O: OffsetSizeTrait>(
1606    offsets: &Buffer,
1607    data: &ArrayData,
1608) -> (Buffer, usize, usize) {
1609    let offsets_slice: &[O] = offsets.typed_data::<O>();
1610    let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1611
1612    let start_offset = offset_slice.first().unwrap();
1613    let end_offset = offset_slice.last().unwrap();
1614
1615    let offsets = match start_offset.as_usize() {
1616        0 => {
1617            let size = size_of::<O>();
1618            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1619        }
1620        _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1621    };
1622
1623    let start_offset = start_offset.as_usize();
1624    let end_offset = end_offset.as_usize();
1625
1626    (offsets, start_offset, end_offset - start_offset)
1627}
1628
1629/// Returns the values and offsets [`Buffer`] for a ByteArray with offset type `O`
1630///
1631/// In particular, this handles re-encoding the offsets if they don't start at `0`,
1632/// slicing the values buffer as appropriate. This helps reduce the encoded
1633/// size of sliced arrays, as values that have been sliced away are not encoded
1634fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1635    if data.is_empty() {
1636        return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1637    }
1638
1639    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1640    let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1641    (offsets, values)
1642}
1643
1644/// Similar logic as [`get_byte_array_buffers()`] but slices the child array instead
1645/// of a values buffer.
1646fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1647    if data.is_empty() {
1648        return (
1649            MutableBuffer::new(0).into(),
1650            data.child_data()[0].slice(0, 0),
1651        );
1652    }
1653
1654    let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1655    let child_data = data.child_data()[0].slice(original_start_offset, len);
1656    (offsets, child_data)
1657}
1658
1659/// Write array data to a vector of bytes
1660#[allow(clippy::too_many_arguments)]
1661fn write_array_data(
1662    array_data: &ArrayData,
1663    buffers: &mut Vec<crate::Buffer>,
1664    arrow_data: &mut Vec<u8>,
1665    nodes: &mut Vec<crate::FieldNode>,
1666    offset: i64,
1667    num_rows: usize,
1668    null_count: usize,
1669    compression_codec: Option<CompressionCodec>,
1670    write_options: &IpcWriteOptions,
1671) -> Result<i64, ArrowError> {
1672    let mut offset = offset;
1673    if !matches!(array_data.data_type(), DataType::Null) {
1674        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1675    } else {
1676        // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
1677        // where null_count is always 0.
1678        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1679    }
1680    if has_validity_bitmap(array_data.data_type(), write_options) {
1681        // write null buffer if exists
1682        let null_buffer = match array_data.nulls() {
1683            None => {
1684                // create a buffer and fill it with valid bits
1685                let num_bytes = bit_util::ceil(num_rows, 8);
1686                let buffer = MutableBuffer::new(num_bytes);
1687                let buffer = buffer.with_bitset(num_bytes, true);
1688                buffer.into()
1689            }
1690            Some(buffer) => buffer.inner().sliced(),
1691        };
1692
1693        offset = write_buffer(
1694            null_buffer.as_slice(),
1695            buffers,
1696            arrow_data,
1697            offset,
1698            compression_codec,
1699            write_options.alignment,
1700        )?;
1701    }
1702
1703    let data_type = array_data.data_type();
1704    if matches!(data_type, DataType::Binary | DataType::Utf8) {
1705        let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1706        for buffer in [offsets, values] {
1707            offset = write_buffer(
1708                buffer.as_slice(),
1709                buffers,
1710                arrow_data,
1711                offset,
1712                compression_codec,
1713                write_options.alignment,
1714            )?;
1715        }
1716    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1717        // Slicing the views buffer is safe and easy,
1718        // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
1719        //
1720        // Current implementation just serialize the raw arrays as given and not try to optimize anything.
1721        // If users wants to "compact" the arrays prior to sending them over IPC,
1722        // they should consider the gc API suggested in #5513
1723        for buffer in array_data.buffers() {
1724            offset = write_buffer(
1725                buffer.as_slice(),
1726                buffers,
1727                arrow_data,
1728                offset,
1729                compression_codec,
1730                write_options.alignment,
1731            )?;
1732        }
1733    } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1734        let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1735        for buffer in [offsets, values] {
1736            offset = write_buffer(
1737                buffer.as_slice(),
1738                buffers,
1739                arrow_data,
1740                offset,
1741                compression_codec,
1742                write_options.alignment,
1743            )?;
1744        }
1745    } else if DataType::is_numeric(data_type)
1746        || DataType::is_temporal(data_type)
1747        || matches!(
1748            array_data.data_type(),
1749            DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1750        )
1751    {
1752        // Truncate values
1753        assert_eq!(array_data.buffers().len(), 1);
1754
1755        let buffer = &array_data.buffers()[0];
1756        let layout = layout(data_type);
1757        let spec = &layout.buffers[0];
1758
1759        let byte_width = get_buffer_element_width(spec);
1760        let min_length = array_data.len() * byte_width;
1761        let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1762            let byte_offset = array_data.offset() * byte_width;
1763            let buffer_length = min(min_length, buffer.len() - byte_offset);
1764            &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1765        } else {
1766            buffer.as_slice()
1767        };
1768        offset = write_buffer(
1769            buffer_slice,
1770            buffers,
1771            arrow_data,
1772            offset,
1773            compression_codec,
1774            write_options.alignment,
1775        )?;
1776    } else if matches!(data_type, DataType::Boolean) {
1777        // Bools are special because the payload (= 1 bit) is smaller than the physical container elements (= bytes).
1778        // The array data may not start at the physical boundary of the underlying buffer, so we need to shift bits around.
1779        assert_eq!(array_data.buffers().len(), 1);
1780
1781        let buffer = &array_data.buffers()[0];
1782        let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1783        offset = write_buffer(
1784            &buffer,
1785            buffers,
1786            arrow_data,
1787            offset,
1788            compression_codec,
1789            write_options.alignment,
1790        )?;
1791    } else if matches!(
1792        data_type,
1793        DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1794    ) {
1795        assert_eq!(array_data.buffers().len(), 1);
1796        assert_eq!(array_data.child_data().len(), 1);
1797
1798        // Truncate offsets and the child data to avoid writing unnecessary data
1799        let (offsets, sliced_child_data) = match data_type {
1800            DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1801            DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1802            DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1803            _ => unreachable!(),
1804        };
1805        offset = write_buffer(
1806            offsets.as_slice(),
1807            buffers,
1808            arrow_data,
1809            offset,
1810            compression_codec,
1811            write_options.alignment,
1812        )?;
1813        offset = write_array_data(
1814            &sliced_child_data,
1815            buffers,
1816            arrow_data,
1817            nodes,
1818            offset,
1819            sliced_child_data.len(),
1820            sliced_child_data.null_count(),
1821            compression_codec,
1822            write_options,
1823        )?;
1824        return Ok(offset);
1825    } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1826        assert_eq!(array_data.child_data().len(), 1);
1827        let fixed_size = *fixed_size as usize;
1828
1829        let child_offset = array_data.offset() * fixed_size;
1830        let child_length = array_data.len() * fixed_size;
1831        let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1832
1833        offset = write_array_data(
1834            &child_data,
1835            buffers,
1836            arrow_data,
1837            nodes,
1838            offset,
1839            child_data.len(),
1840            child_data.null_count(),
1841            compression_codec,
1842            write_options,
1843        )?;
1844        return Ok(offset);
1845    } else {
1846        for buffer in array_data.buffers() {
1847            offset = write_buffer(
1848                buffer,
1849                buffers,
1850                arrow_data,
1851                offset,
1852                compression_codec,
1853                write_options.alignment,
1854            )?;
1855        }
1856    }
1857
1858    match array_data.data_type() {
1859        DataType::Dictionary(_, _) => {}
1860        DataType::RunEndEncoded(_, _) => {
1861            // unslice the run encoded array.
1862            let arr = unslice_run_array(array_data.clone())?;
1863            // recursively write out nested structures
1864            for data_ref in arr.child_data() {
1865                // write the nested data (e.g list data)
1866                offset = write_array_data(
1867                    data_ref,
1868                    buffers,
1869                    arrow_data,
1870                    nodes,
1871                    offset,
1872                    data_ref.len(),
1873                    data_ref.null_count(),
1874                    compression_codec,
1875                    write_options,
1876                )?;
1877            }
1878        }
1879        _ => {
1880            // recursively write out nested structures
1881            for data_ref in array_data.child_data() {
1882                // write the nested data (e.g list data)
1883                offset = write_array_data(
1884                    data_ref,
1885                    buffers,
1886                    arrow_data,
1887                    nodes,
1888                    offset,
1889                    data_ref.len(),
1890                    data_ref.null_count(),
1891                    compression_codec,
1892                    write_options,
1893                )?;
1894            }
1895        }
1896    }
1897    Ok(offset)
1898}
1899
1900/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
1901/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
1902///
1903///
1904/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
1905/// Each constituent buffer is first compressed with the indicated
1906/// compressor, and then written with the uncompressed length in the first 8
1907/// bytes as a 64-bit little-endian signed integer followed by the compressed
1908/// buffer bytes (and then padding as required by the protocol). The
1909/// uncompressed length may be set to -1 to indicate that the data that
1910/// follows is not compressed, which can be useful for cases where
1911/// compression does not yield appreciable savings.
1912fn write_buffer(
1913    buffer: &[u8],                    // input
1914    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
1915    arrow_data: &mut Vec<u8>,         // output stream
1916    offset: i64,                      // current output stream offset
1917    compression_codec: Option<CompressionCodec>,
1918    alignment: u8,
1919) -> Result<i64, ArrowError> {
1920    let len: i64 = match compression_codec {
1921        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1922        None => {
1923            arrow_data.extend_from_slice(buffer);
1924            buffer.len()
1925        }
1926    }
1927    .try_into()
1928    .map_err(|e| {
1929        ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1930    })?;
1931
1932    // make new index entry
1933    buffers.push(crate::Buffer::new(offset, len));
1934    // padding and make offset aligned
1935    let pad_len = pad_to_alignment(alignment, len as usize);
1936    arrow_data.extend_from_slice(&PADDING[..pad_len]);
1937
1938    Ok(offset + len + (pad_len as i64))
1939}
1940
1941const PADDING: [u8; 64] = [0; 64];
1942
1943/// Calculate an alignment boundary and return the number of bytes needed to pad to the alignment boundary
1944#[inline]
1945fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1946    let a = usize::from(alignment - 1);
1947    ((len + a) & !a) - len
1948}
1949
1950#[cfg(test)]
1951mod tests {
1952    use std::hash::Hasher;
1953    use std::io::Cursor;
1954    use std::io::Seek;
1955
1956    use arrow_array::builder::FixedSizeListBuilder;
1957    use arrow_array::builder::Float32Builder;
1958    use arrow_array::builder::Int64Builder;
1959    use arrow_array::builder::MapBuilder;
1960    use arrow_array::builder::UnionBuilder;
1961    use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1962    use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1963    use arrow_array::types::*;
1964    use arrow_buffer::ScalarBuffer;
1965
1966    use crate::convert::fb_to_schema;
1967    use crate::reader::*;
1968    use crate::root_as_footer;
1969    use crate::MetadataVersion;
1970
1971    use super::*;
1972
1973    fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1974        let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1975        writer.write(rb).unwrap();
1976        writer.finish().unwrap();
1977        writer.into_inner().unwrap()
1978    }
1979
1980    fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1981        let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1982        reader.next().unwrap().unwrap()
1983    }
1984
1985    fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1986        // Use 8-byte alignment so that the various `truncate_*` tests can be compactly written,
1987        // without needing to construct a giant array to spill over the 64-byte default alignment
1988        // boundary.
1989        const IPC_ALIGNMENT: usize = 8;
1990
1991        let mut stream_writer = StreamWriter::try_new_with_options(
1992            vec![],
1993            record.schema_ref(),
1994            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1995        )
1996        .unwrap();
1997        stream_writer.write(record).unwrap();
1998        stream_writer.finish().unwrap();
1999        stream_writer.into_inner().unwrap()
2000    }
2001
2002    fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2003        let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2004        stream_reader.next().unwrap().unwrap()
2005    }
2006
2007    #[test]
2008    #[cfg(feature = "lz4")]
2009    fn test_write_empty_record_batch_lz4_compression() {
2010        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2011        let values: Vec<Option<i32>> = vec![];
2012        let array = Int32Array::from(values);
2013        let record_batch =
2014            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2015
2016        let mut file = tempfile::tempfile().unwrap();
2017
2018        {
2019            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2020                .unwrap()
2021                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2022                .unwrap();
2023
2024            let mut writer =
2025                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2026            writer.write(&record_batch).unwrap();
2027            writer.finish().unwrap();
2028        }
2029        file.rewind().unwrap();
2030        {
2031            // read file
2032            let reader = FileReader::try_new(file, None).unwrap();
2033            for read_batch in reader {
2034                read_batch
2035                    .unwrap()
2036                    .columns()
2037                    .iter()
2038                    .zip(record_batch.columns())
2039                    .for_each(|(a, b)| {
2040                        assert_eq!(a.data_type(), b.data_type());
2041                        assert_eq!(a.len(), b.len());
2042                        assert_eq!(a.null_count(), b.null_count());
2043                    });
2044            }
2045        }
2046    }
2047
2048    #[test]
2049    #[cfg(feature = "lz4")]
2050    fn test_write_file_with_lz4_compression() {
2051        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2052        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2053        let array = Int32Array::from(values);
2054        let record_batch =
2055            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2056
2057        let mut file = tempfile::tempfile().unwrap();
2058        {
2059            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2060                .unwrap()
2061                .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2062                .unwrap();
2063
2064            let mut writer =
2065                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2066            writer.write(&record_batch).unwrap();
2067            writer.finish().unwrap();
2068        }
2069        file.rewind().unwrap();
2070        {
2071            // read file
2072            let reader = FileReader::try_new(file, None).unwrap();
2073            for read_batch in reader {
2074                read_batch
2075                    .unwrap()
2076                    .columns()
2077                    .iter()
2078                    .zip(record_batch.columns())
2079                    .for_each(|(a, b)| {
2080                        assert_eq!(a.data_type(), b.data_type());
2081                        assert_eq!(a.len(), b.len());
2082                        assert_eq!(a.null_count(), b.null_count());
2083                    });
2084            }
2085        }
2086    }
2087
2088    #[test]
2089    #[cfg(feature = "zstd")]
2090    fn test_write_file_with_zstd_compression() {
2091        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2092        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2093        let array = Int32Array::from(values);
2094        let record_batch =
2095            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2096        let mut file = tempfile::tempfile().unwrap();
2097        {
2098            let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2099                .unwrap()
2100                .try_with_compression(Some(crate::CompressionType::ZSTD))
2101                .unwrap();
2102
2103            let mut writer =
2104                FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2105            writer.write(&record_batch).unwrap();
2106            writer.finish().unwrap();
2107        }
2108        file.rewind().unwrap();
2109        {
2110            // read file
2111            let reader = FileReader::try_new(file, None).unwrap();
2112            for read_batch in reader {
2113                read_batch
2114                    .unwrap()
2115                    .columns()
2116                    .iter()
2117                    .zip(record_batch.columns())
2118                    .for_each(|(a, b)| {
2119                        assert_eq!(a.data_type(), b.data_type());
2120                        assert_eq!(a.len(), b.len());
2121                        assert_eq!(a.null_count(), b.null_count());
2122                    });
2123            }
2124        }
2125    }
2126
2127    #[test]
2128    fn test_write_file() {
2129        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2130        let values: Vec<Option<u32>> = vec![
2131            Some(999),
2132            None,
2133            Some(235),
2134            Some(123),
2135            None,
2136            None,
2137            None,
2138            None,
2139            None,
2140        ];
2141        let array1 = UInt32Array::from(values);
2142        let batch =
2143            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2144                .unwrap();
2145        let mut file = tempfile::tempfile().unwrap();
2146        {
2147            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2148
2149            writer.write(&batch).unwrap();
2150            writer.finish().unwrap();
2151        }
2152        file.rewind().unwrap();
2153
2154        {
2155            let mut reader = FileReader::try_new(file, None).unwrap();
2156            while let Some(Ok(read_batch)) = reader.next() {
2157                read_batch
2158                    .columns()
2159                    .iter()
2160                    .zip(batch.columns())
2161                    .for_each(|(a, b)| {
2162                        assert_eq!(a.data_type(), b.data_type());
2163                        assert_eq!(a.len(), b.len());
2164                        assert_eq!(a.null_count(), b.null_count());
2165                    });
2166            }
2167        }
2168    }
2169
2170    fn write_null_file(options: IpcWriteOptions) {
2171        let schema = Schema::new(vec![
2172            Field::new("nulls", DataType::Null, true),
2173            Field::new("int32s", DataType::Int32, false),
2174            Field::new("nulls2", DataType::Null, true),
2175            Field::new("f64s", DataType::Float64, false),
2176        ]);
2177        let array1 = NullArray::new(32);
2178        let array2 = Int32Array::from(vec![1; 32]);
2179        let array3 = NullArray::new(32);
2180        let array4 = Float64Array::from(vec![f64::NAN; 32]);
2181        let batch = RecordBatch::try_new(
2182            Arc::new(schema.clone()),
2183            vec![
2184                Arc::new(array1) as ArrayRef,
2185                Arc::new(array2) as ArrayRef,
2186                Arc::new(array3) as ArrayRef,
2187                Arc::new(array4) as ArrayRef,
2188            ],
2189        )
2190        .unwrap();
2191        let mut file = tempfile::tempfile().unwrap();
2192        {
2193            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2194
2195            writer.write(&batch).unwrap();
2196            writer.finish().unwrap();
2197        }
2198
2199        file.rewind().unwrap();
2200
2201        {
2202            let reader = FileReader::try_new(file, None).unwrap();
2203            reader.for_each(|maybe_batch| {
2204                maybe_batch
2205                    .unwrap()
2206                    .columns()
2207                    .iter()
2208                    .zip(batch.columns())
2209                    .for_each(|(a, b)| {
2210                        assert_eq!(a.data_type(), b.data_type());
2211                        assert_eq!(a.len(), b.len());
2212                        assert_eq!(a.null_count(), b.null_count());
2213                    });
2214            });
2215        }
2216    }
2217    #[test]
2218    fn test_write_null_file_v4() {
2219        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2220        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2221        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2222        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2223    }
2224
2225    #[test]
2226    fn test_write_null_file_v5() {
2227        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2228        write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2229    }
2230
2231    #[test]
2232    fn track_union_nested_dict() {
2233        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2234
2235        let array = Arc::new(inner) as ArrayRef;
2236
2237        // Dict field with id 2
2238        #[allow(deprecated)]
2239        let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2240        let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2241
2242        let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2243        let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2244
2245        let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2246
2247        let schema = Arc::new(Schema::new(vec![Field::new(
2248            "union",
2249            union.data_type().clone(),
2250            false,
2251        )]));
2252
2253        let gen = IpcDataGenerator {};
2254        let mut dict_tracker = DictionaryTracker::new(false);
2255        gen.schema_to_bytes_with_dictionary_tracker(
2256            &schema,
2257            &mut dict_tracker,
2258            &IpcWriteOptions::default(),
2259        );
2260
2261        let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2262
2263        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2264            .unwrap();
2265
2266        // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema
2267        // so we expect the dict will be keyed to 0
2268        assert!(dict_tracker.written.contains_key(&0));
2269    }
2270
2271    #[test]
2272    fn track_struct_nested_dict() {
2273        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2274
2275        let array = Arc::new(inner) as ArrayRef;
2276
2277        // Dict field with id 2
2278        #[allow(deprecated)]
2279        let dctfield = Arc::new(Field::new_dict(
2280            "dict",
2281            array.data_type().clone(),
2282            false,
2283            2,
2284            false,
2285        ));
2286
2287        let s = StructArray::from(vec![(dctfield, array)]);
2288        let struct_array = Arc::new(s) as ArrayRef;
2289
2290        let schema = Arc::new(Schema::new(vec![Field::new(
2291            "struct",
2292            struct_array.data_type().clone(),
2293            false,
2294        )]));
2295
2296        let gen = IpcDataGenerator {};
2297        let mut dict_tracker = DictionaryTracker::new(false);
2298        gen.schema_to_bytes_with_dictionary_tracker(
2299            &schema,
2300            &mut dict_tracker,
2301            &IpcWriteOptions::default(),
2302        );
2303
2304        let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2305
2306        gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2307            .unwrap();
2308
2309        assert!(dict_tracker.written.contains_key(&0));
2310    }
2311
2312    fn write_union_file(options: IpcWriteOptions) {
2313        let schema = Schema::new(vec![Field::new_union(
2314            "union",
2315            vec![0, 1],
2316            vec![
2317                Field::new("a", DataType::Int32, false),
2318                Field::new("c", DataType::Float64, false),
2319            ],
2320            UnionMode::Sparse,
2321        )]);
2322        let mut builder = UnionBuilder::with_capacity_sparse(5);
2323        builder.append::<Int32Type>("a", 1).unwrap();
2324        builder.append_null::<Int32Type>("a").unwrap();
2325        builder.append::<Float64Type>("c", 3.0).unwrap();
2326        builder.append_null::<Float64Type>("c").unwrap();
2327        builder.append::<Int32Type>("a", 4).unwrap();
2328        let union = builder.build().unwrap();
2329
2330        let batch =
2331            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2332                .unwrap();
2333
2334        let mut file = tempfile::tempfile().unwrap();
2335        {
2336            let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2337
2338            writer.write(&batch).unwrap();
2339            writer.finish().unwrap();
2340        }
2341        file.rewind().unwrap();
2342
2343        {
2344            let reader = FileReader::try_new(file, None).unwrap();
2345            reader.for_each(|maybe_batch| {
2346                maybe_batch
2347                    .unwrap()
2348                    .columns()
2349                    .iter()
2350                    .zip(batch.columns())
2351                    .for_each(|(a, b)| {
2352                        assert_eq!(a.data_type(), b.data_type());
2353                        assert_eq!(a.len(), b.len());
2354                        assert_eq!(a.null_count(), b.null_count());
2355                    });
2356            });
2357        }
2358    }
2359
2360    #[test]
2361    fn test_write_union_file_v4_v5() {
2362        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2363        write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2364    }
2365
2366    #[test]
2367    fn test_write_view_types() {
2368        const LONG_TEST_STRING: &str =
2369            "This is a long string to make sure binary view array handles it";
2370        let schema = Schema::new(vec![
2371            Field::new("field1", DataType::BinaryView, true),
2372            Field::new("field2", DataType::Utf8View, true),
2373        ]);
2374        let values: Vec<Option<&[u8]>> = vec![
2375            Some(b"foo"),
2376            Some(b"bar"),
2377            Some(LONG_TEST_STRING.as_bytes()),
2378        ];
2379        let binary_array = BinaryViewArray::from_iter(values);
2380        let utf8_array =
2381            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2382        let record_batch = RecordBatch::try_new(
2383            Arc::new(schema.clone()),
2384            vec![Arc::new(binary_array), Arc::new(utf8_array)],
2385        )
2386        .unwrap();
2387
2388        let mut file = tempfile::tempfile().unwrap();
2389        {
2390            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2391            writer.write(&record_batch).unwrap();
2392            writer.finish().unwrap();
2393        }
2394        file.rewind().unwrap();
2395        {
2396            let mut reader = FileReader::try_new(&file, None).unwrap();
2397            let read_batch = reader.next().unwrap().unwrap();
2398            read_batch
2399                .columns()
2400                .iter()
2401                .zip(record_batch.columns())
2402                .for_each(|(a, b)| {
2403                    assert_eq!(a, b);
2404                });
2405        }
2406        file.rewind().unwrap();
2407        {
2408            let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2409            let read_batch = reader.next().unwrap().unwrap();
2410            assert_eq!(read_batch.num_columns(), 1);
2411            let read_array = read_batch.column(0);
2412            let write_array = record_batch.column(0);
2413            assert_eq!(read_array, write_array);
2414        }
2415    }
2416
2417    #[test]
2418    fn truncate_ipc_record_batch() {
2419        fn create_batch(rows: usize) -> RecordBatch {
2420            let schema = Schema::new(vec![
2421                Field::new("a", DataType::Int32, false),
2422                Field::new("b", DataType::Utf8, false),
2423            ]);
2424
2425            let a = Int32Array::from_iter_values(0..rows as i32);
2426            let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2427
2428            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2429        }
2430
2431        let big_record_batch = create_batch(65536);
2432
2433        let length = 5;
2434        let small_record_batch = create_batch(length);
2435
2436        let offset = 2;
2437        let record_batch_slice = big_record_batch.slice(offset, length);
2438        assert!(
2439            serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2440        );
2441        assert_eq!(
2442            serialize_stream(&small_record_batch).len(),
2443            serialize_stream(&record_batch_slice).len()
2444        );
2445
2446        assert_eq!(
2447            deserialize_stream(serialize_stream(&record_batch_slice)),
2448            record_batch_slice
2449        );
2450    }
2451
2452    #[test]
2453    fn truncate_ipc_record_batch_with_nulls() {
2454        fn create_batch() -> RecordBatch {
2455            let schema = Schema::new(vec![
2456                Field::new("a", DataType::Int32, true),
2457                Field::new("b", DataType::Utf8, true),
2458            ]);
2459
2460            let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2461            let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2462
2463            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2464        }
2465
2466        let record_batch = create_batch();
2467        let record_batch_slice = record_batch.slice(1, 2);
2468        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2469
2470        assert!(
2471            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2472        );
2473
2474        assert!(deserialized_batch.column(0).is_null(0));
2475        assert!(deserialized_batch.column(0).is_valid(1));
2476        assert!(deserialized_batch.column(1).is_valid(0));
2477        assert!(deserialized_batch.column(1).is_valid(1));
2478
2479        assert_eq!(record_batch_slice, deserialized_batch);
2480    }
2481
2482    #[test]
2483    fn truncate_ipc_dictionary_array() {
2484        fn create_batch() -> RecordBatch {
2485            let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2486                .into_iter()
2487                .collect();
2488            let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2489
2490            let array = DictionaryArray::new(keys, Arc::new(values));
2491
2492            let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2493
2494            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2495        }
2496
2497        let record_batch = create_batch();
2498        let record_batch_slice = record_batch.slice(1, 2);
2499        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2500
2501        assert!(
2502            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2503        );
2504
2505        assert!(deserialized_batch.column(0).is_valid(0));
2506        assert!(deserialized_batch.column(0).is_null(1));
2507
2508        assert_eq!(record_batch_slice, deserialized_batch);
2509    }
2510
2511    #[test]
2512    fn truncate_ipc_struct_array() {
2513        fn create_batch() -> RecordBatch {
2514            let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2515                .into_iter()
2516                .collect();
2517            let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2518
2519            let struct_array = StructArray::from(vec![
2520                (
2521                    Arc::new(Field::new("s", DataType::Utf8, true)),
2522                    Arc::new(strings) as ArrayRef,
2523                ),
2524                (
2525                    Arc::new(Field::new("c", DataType::Int32, true)),
2526                    Arc::new(ints) as ArrayRef,
2527                ),
2528            ]);
2529
2530            let schema = Schema::new(vec![Field::new(
2531                "struct_array",
2532                struct_array.data_type().clone(),
2533                true,
2534            )]);
2535
2536            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2537        }
2538
2539        let record_batch = create_batch();
2540        let record_batch_slice = record_batch.slice(1, 2);
2541        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2542
2543        assert!(
2544            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2545        );
2546
2547        let structs = deserialized_batch
2548            .column(0)
2549            .as_any()
2550            .downcast_ref::<StructArray>()
2551            .unwrap();
2552
2553        assert!(structs.column(0).is_null(0));
2554        assert!(structs.column(0).is_valid(1));
2555        assert!(structs.column(1).is_valid(0));
2556        assert!(structs.column(1).is_null(1));
2557        assert_eq!(record_batch_slice, deserialized_batch);
2558    }
2559
2560    #[test]
2561    fn truncate_ipc_string_array_with_all_empty_string() {
2562        fn create_batch() -> RecordBatch {
2563            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2564            let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2565            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2566        }
2567
2568        let record_batch = create_batch();
2569        let record_batch_slice = record_batch.slice(0, 1);
2570        let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2571
2572        assert!(
2573            serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2574        );
2575        assert_eq!(record_batch_slice, deserialized_batch);
2576    }
2577
2578    #[test]
2579    fn test_stream_writer_writes_array_slice() {
2580        let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2581        assert_eq!(
2582            vec![Some(1), Some(2), Some(3)],
2583            array.iter().collect::<Vec<_>>()
2584        );
2585
2586        let sliced = array.slice(1, 2);
2587        assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2588
2589        let batch = RecordBatch::try_new(
2590            Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2591            vec![Arc::new(sliced)],
2592        )
2593        .expect("new batch");
2594
2595        let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2596        writer.write(&batch).expect("write");
2597        let outbuf = writer.into_inner().expect("inner");
2598
2599        let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2600        let read_batch = reader.next().unwrap().expect("read batch");
2601
2602        let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2603        assert_eq!(
2604            vec![Some(2), Some(3)],
2605            read_array.iter().collect::<Vec<_>>()
2606        );
2607    }
2608
2609    #[test]
2610    fn test_large_slice_uint32() {
2611        ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2612            if i % 2 == 0 {
2613                Some(i)
2614            } else {
2615                None
2616            }
2617        }))));
2618    }
2619
2620    #[test]
2621    fn test_large_slice_string() {
2622        let strings: Vec<_> = (0..8000)
2623            .map(|i| {
2624                if i % 2 == 0 {
2625                    Some(format!("value{i}"))
2626                } else {
2627                    None
2628                }
2629            })
2630            .collect();
2631
2632        ensure_roundtrip(Arc::new(StringArray::from(strings)));
2633    }
2634
2635    #[test]
2636    fn test_large_slice_string_list() {
2637        let mut ls = ListBuilder::new(StringBuilder::new());
2638
2639        let mut s = String::new();
2640        for row_number in 0..8000 {
2641            if row_number % 2 == 0 {
2642                for list_element in 0..1000 {
2643                    s.clear();
2644                    use std::fmt::Write;
2645                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2646                    ls.values().append_value(&s);
2647                }
2648                ls.append(true)
2649            } else {
2650                ls.append(false); // null
2651            }
2652        }
2653
2654        ensure_roundtrip(Arc::new(ls.finish()));
2655    }
2656
2657    #[test]
2658    fn test_large_slice_string_list_of_lists() {
2659        // The reason for the special test is to verify reencode_offsets which looks both at
2660        // the starting offset and the data offset.  So need a dataset where the starting_offset
2661        // is zero but the data offset is not.
2662        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2663
2664        for _ in 0..4000 {
2665            ls.values().append(true);
2666            ls.append(true)
2667        }
2668
2669        let mut s = String::new();
2670        for row_number in 0..4000 {
2671            if row_number % 2 == 0 {
2672                for list_element in 0..1000 {
2673                    s.clear();
2674                    use std::fmt::Write;
2675                    write!(&mut s, "value{row_number}-{list_element}").unwrap();
2676                    ls.values().values().append_value(&s);
2677                }
2678                ls.values().append(true);
2679                ls.append(true)
2680            } else {
2681                ls.append(false); // null
2682            }
2683        }
2684
2685        ensure_roundtrip(Arc::new(ls.finish()));
2686    }
2687
2688    /// Read/write a record batch to a File and Stream and ensure it is the same at the outout
2689    fn ensure_roundtrip(array: ArrayRef) {
2690        let num_rows = array.len();
2691        let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2692        // take off the first element
2693        let sliced_batch = orig_batch.slice(1, num_rows - 1);
2694
2695        let schema = orig_batch.schema();
2696        let stream_data = {
2697            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2698            writer.write(&sliced_batch).unwrap();
2699            writer.into_inner().unwrap()
2700        };
2701        let read_batch = {
2702            let projection = None;
2703            let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2704            reader
2705                .next()
2706                .expect("expect no errors reading batch")
2707                .expect("expect batch")
2708        };
2709        assert_eq!(sliced_batch, read_batch);
2710
2711        let file_data = {
2712            let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2713            writer.write(&sliced_batch).unwrap();
2714            writer.into_inner().unwrap().into_inner().unwrap()
2715        };
2716        let read_batch = {
2717            let projection = None;
2718            let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2719            reader
2720                .next()
2721                .expect("expect no errors reading batch")
2722                .expect("expect batch")
2723        };
2724        assert_eq!(sliced_batch, read_batch);
2725
2726        // TODO test file writer/reader
2727    }
2728
2729    #[test]
2730    fn encode_bools_slice() {
2731        // Test case for https://github.com/apache/arrow-rs/issues/3496
2732        assert_bool_roundtrip([true, false], 1, 1);
2733
2734        // slice somewhere in the middle
2735        assert_bool_roundtrip(
2736            [
2737                true, false, true, true, false, false, true, true, true, false, false, false, true,
2738                true, true, true, false, false, false, false, true, true, true, true, true, false,
2739                false, false, false, false,
2740            ],
2741            13,
2742            17,
2743        );
2744
2745        // start at byte boundary, end in the middle
2746        assert_bool_roundtrip(
2747            [
2748                true, false, true, true, false, false, true, true, true, false, false, false,
2749            ],
2750            8,
2751            2,
2752        );
2753
2754        // start and stop and byte boundary
2755        assert_bool_roundtrip(
2756            [
2757                true, false, true, true, false, false, true, true, true, false, false, false, true,
2758                true, true, true, true, false, false, false, false, false,
2759            ],
2760            8,
2761            8,
2762        );
2763    }
2764
2765    fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2766        let val_bool_field = Field::new("val", DataType::Boolean, false);
2767
2768        let schema = Arc::new(Schema::new(vec![val_bool_field]));
2769
2770        let bools = BooleanArray::from(bools.to_vec());
2771
2772        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2773        let batch = batch.slice(offset, length);
2774
2775        let data = serialize_stream(&batch);
2776        let batch2 = deserialize_stream(data);
2777        assert_eq!(batch, batch2);
2778    }
2779
2780    #[test]
2781    fn test_run_array_unslice() {
2782        let total_len = 80;
2783        let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2784        let repeats: Vec<usize> = vec![3, 4, 1, 2];
2785        let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2786        for ix in 0_usize..32 {
2787            let repeat: usize = repeats[ix % repeats.len()];
2788            let val: Option<i32> = vals[ix % vals.len()];
2789            input_array.resize(input_array.len() + repeat, val);
2790        }
2791
2792        // Encode the input_array to run array
2793        let mut builder =
2794            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2795        builder.extend(input_array.iter().copied());
2796        let run_array = builder.finish();
2797
2798        // test for all slice lengths.
2799        for slice_len in 1..=total_len {
2800            // test for offset = 0, slice length = slice_len
2801            let sliced_run_array: RunArray<Int16Type> =
2802                run_array.slice(0, slice_len).into_data().into();
2803
2804            // Create unsliced run array.
2805            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2806            let typed = unsliced_run_array
2807                .downcast::<PrimitiveArray<Int32Type>>()
2808                .unwrap();
2809            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2810            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2811            assert_eq!(expected, actual);
2812
2813            // test for offset = total_len - slice_len, length = slice_len
2814            let sliced_run_array: RunArray<Int16Type> = run_array
2815                .slice(total_len - slice_len, slice_len)
2816                .into_data()
2817                .into();
2818
2819            // Create unsliced run array.
2820            let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2821            let typed = unsliced_run_array
2822                .downcast::<PrimitiveArray<Int32Type>>()
2823                .unwrap();
2824            let expected: Vec<Option<i32>> = input_array
2825                .iter()
2826                .skip(total_len - slice_len)
2827                .copied()
2828                .collect();
2829            let actual: Vec<Option<i32>> = typed.into_iter().collect();
2830            assert_eq!(expected, actual);
2831        }
2832    }
2833
2834    fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2835        let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2836
2837        for i in 0..100_000 {
2838            for value in [i, i, i] {
2839                ls.values().append_value(value);
2840            }
2841            ls.append(true)
2842        }
2843
2844        ls.finish()
2845    }
2846
2847    fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2848        let mut ls =
2849            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2850
2851        for _i in 0..10_000 {
2852            for j in 0..10 {
2853                for value in [j, j, j, j] {
2854                    ls.values().values().append_value(value);
2855                }
2856                ls.values().append(true)
2857            }
2858            ls.append(true);
2859        }
2860
2861        ls.finish()
2862    }
2863
2864    fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2865        let mut ls =
2866            GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2867
2868        for _i in 0..999 {
2869            ls.values().append(true);
2870            ls.append(true);
2871        }
2872
2873        for j in 0..10 {
2874            for value in [j, j, j, j] {
2875                ls.values().values().append_value(value);
2876            }
2877            ls.values().append(true)
2878        }
2879        ls.append(true);
2880
2881        for i in 0..9_000 {
2882            for j in 0..10 {
2883                for value in [i + j, i + j, i + j, i + j] {
2884                    ls.values().values().append_value(value);
2885                }
2886                ls.values().append(true)
2887            }
2888            ls.append(true);
2889        }
2890
2891        ls.finish()
2892    }
2893
2894    fn generate_map_array_data() -> MapArray {
2895        let keys_builder = UInt32Builder::new();
2896        let values_builder = UInt32Builder::new();
2897
2898        let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2899
2900        for i in 0..100_000 {
2901            for _j in 0..3 {
2902                builder.keys().append_value(i);
2903                builder.values().append_value(i * 2);
2904            }
2905            builder.append(true).unwrap();
2906        }
2907
2908        builder.finish()
2909    }
2910
2911    #[test]
2912    fn reencode_offsets_when_first_offset_is_not_zero() {
2913        let original_list = generate_list_data::<i32>();
2914        let original_data = original_list.into_data();
2915        let slice_data = original_data.slice(75, 7);
2916        let (new_offsets, original_start, length) =
2917            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2918        assert_eq!(
2919            vec![0, 3, 6, 9, 12, 15, 18, 21],
2920            new_offsets.typed_data::<i32>()
2921        );
2922        assert_eq!(225, original_start);
2923        assert_eq!(21, length);
2924    }
2925
2926    #[test]
2927    fn reencode_offsets_when_first_offset_is_zero() {
2928        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2929        // ls = [[], [35, 42]
2930        ls.append(true);
2931        ls.values().append_value(35);
2932        ls.values().append_value(42);
2933        ls.append(true);
2934        let original_list = ls.finish();
2935        let original_data = original_list.into_data();
2936
2937        let slice_data = original_data.slice(1, 1);
2938        let (new_offsets, original_start, length) =
2939            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2940        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2941        assert_eq!(0, original_start);
2942        assert_eq!(2, length);
2943    }
2944
2945    /// Ensure when serde full & sliced versions they are equal to original input.
2946    /// Also ensure serialized sliced version is significantly smaller than serialized full.
2947    fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2948        // test both full and sliced versions
2949        let in_sliced = in_batch.slice(999, 1);
2950
2951        let bytes_batch = serialize_file(&in_batch);
2952        let bytes_sliced = serialize_file(&in_sliced);
2953
2954        // serializing 1 row should be significantly smaller than serializing 100,000
2955        assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2956
2957        // ensure both are still valid and equal to originals
2958        let out_batch = deserialize_file(bytes_batch);
2959        assert_eq!(in_batch, out_batch);
2960
2961        let out_sliced = deserialize_file(bytes_sliced);
2962        assert_eq!(in_sliced, out_sliced);
2963    }
2964
2965    #[test]
2966    fn encode_lists() {
2967        let val_inner = Field::new_list_field(DataType::UInt32, true);
2968        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2969        let schema = Arc::new(Schema::new(vec![val_list_field]));
2970
2971        let values = Arc::new(generate_list_data::<i32>());
2972
2973        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2974        roundtrip_ensure_sliced_smaller(in_batch, 1000);
2975    }
2976
2977    #[test]
2978    fn encode_empty_list() {
2979        let val_inner = Field::new_list_field(DataType::UInt32, true);
2980        let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2981        let schema = Arc::new(Schema::new(vec![val_list_field]));
2982
2983        let values = Arc::new(generate_list_data::<i32>());
2984
2985        let in_batch = RecordBatch::try_new(schema, vec![values])
2986            .unwrap()
2987            .slice(999, 0);
2988        let out_batch = deserialize_file(serialize_file(&in_batch));
2989        assert_eq!(in_batch, out_batch);
2990    }
2991
2992    #[test]
2993    fn encode_large_lists() {
2994        let val_inner = Field::new_list_field(DataType::UInt32, true);
2995        let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2996        let schema = Arc::new(Schema::new(vec![val_list_field]));
2997
2998        let values = Arc::new(generate_list_data::<i64>());
2999
3000        // ensure when serde full & sliced versions they are equal to original input
3001        // also ensure serialized sliced version is significantly smaller than serialized full
3002        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3003        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3004    }
3005
3006    #[test]
3007    fn encode_nested_lists() {
3008        let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3009        let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3010        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3011        let schema = Arc::new(Schema::new(vec![list_field]));
3012
3013        let values = Arc::new(generate_nested_list_data::<i32>());
3014
3015        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3016        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3017    }
3018
3019    #[test]
3020    fn encode_nested_lists_starting_at_zero() {
3021        let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3022        let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3023        let list_field = Field::new("val", DataType::List(inner_list_field), true);
3024        let schema = Arc::new(Schema::new(vec![list_field]));
3025
3026        let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3027
3028        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3029        roundtrip_ensure_sliced_smaller(in_batch, 1);
3030    }
3031
3032    #[test]
3033    fn encode_map_array() {
3034        let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3035        let values = Arc::new(Field::new("values", DataType::UInt32, true));
3036        let map_field = Field::new_map("map", "entries", keys, values, false, true);
3037        let schema = Arc::new(Schema::new(vec![map_field]));
3038
3039        let values = Arc::new(generate_map_array_data());
3040
3041        let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3042        roundtrip_ensure_sliced_smaller(in_batch, 1000);
3043    }
3044
3045    #[test]
3046    fn test_decimal128_alignment16_is_sufficient() {
3047        const IPC_ALIGNMENT: usize = 16;
3048
3049        // Test a bunch of different dimensions to ensure alignment is never an issue.
3050        // For example, if we only test `num_cols = 1` then even with alignment 8 this
3051        // test would _happen_ to pass, even though for different dimensions like
3052        // `num_cols = 2` it would fail.
3053        for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3054            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
3055
3056            let mut fields = Vec::new();
3057            let mut arrays = Vec::new();
3058            for i in 0..num_cols {
3059                let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3060                let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3061                fields.push(field);
3062                arrays.push(Arc::new(array) as Arc<dyn Array>);
3063            }
3064            let schema = Schema::new(fields);
3065            let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3066
3067            let mut writer = FileWriter::try_new_with_options(
3068                Vec::new(),
3069                batch.schema_ref(),
3070                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3071            )
3072            .unwrap();
3073            writer.write(&batch).unwrap();
3074            writer.finish().unwrap();
3075
3076            let out: Vec<u8> = writer.into_inner().unwrap();
3077
3078            let buffer = Buffer::from_vec(out);
3079            let trailer_start = buffer.len() - 10;
3080            let footer_len =
3081                read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3082            let footer =
3083                root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3084
3085            let schema = fb_to_schema(footer.schema().unwrap());
3086
3087            // Importantly we set `require_alignment`, checking that 16-byte alignment is sufficient
3088            // for `read_record_batch` later on to read the data in a zero-copy manner.
3089            let decoder =
3090                FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3091
3092            let batches = footer.recordBatches().unwrap();
3093
3094            let block = batches.get(0);
3095            let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3096            let data = buffer.slice_with_length(block.offset() as _, block_len);
3097
3098            let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3099
3100            assert_eq!(batch, batch2);
3101        }
3102    }
3103
3104    #[test]
3105    fn test_decimal128_alignment8_is_unaligned() {
3106        const IPC_ALIGNMENT: usize = 8;
3107
3108        let num_cols = 2;
3109        let num_rows = 1;
3110
3111        let mut fields = Vec::new();
3112        let mut arrays = Vec::new();
3113        for i in 0..num_cols {
3114            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3115            let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3116            fields.push(field);
3117            arrays.push(Arc::new(array) as Arc<dyn Array>);
3118        }
3119        let schema = Schema::new(fields);
3120        let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3121
3122        let mut writer = FileWriter::try_new_with_options(
3123            Vec::new(),
3124            batch.schema_ref(),
3125            IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3126        )
3127        .unwrap();
3128        writer.write(&batch).unwrap();
3129        writer.finish().unwrap();
3130
3131        let out: Vec<u8> = writer.into_inner().unwrap();
3132
3133        let buffer = Buffer::from_vec(out);
3134        let trailer_start = buffer.len() - 10;
3135        let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3136        let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3137        let schema = fb_to_schema(footer.schema().unwrap());
3138
3139        // Importantly we set `require_alignment`, otherwise the error later is suppressed due to copying
3140        // to an aligned buffer in `ArrayDataBuilder.build_aligned`.
3141        let decoder =
3142            FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3143
3144        let batches = footer.recordBatches().unwrap();
3145
3146        let block = batches.get(0);
3147        let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3148        let data = buffer.slice_with_length(block.offset() as _, block_len);
3149
3150        let result = decoder.read_record_batch(block, &data);
3151
3152        let error = result.unwrap_err();
3153        assert_eq!(
3154            error.to_string(),
3155            "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3156             offset from expected alignment of 16 by 8"
3157        );
3158    }
3159
3160    #[test]
3161    fn test_flush() {
3162        // We write a schema which is small enough to fit into a buffer and not get flushed,
3163        // and then force the write with .flush().
3164        let num_cols = 2;
3165        let mut fields = Vec::new();
3166        let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3167        for i in 0..num_cols {
3168            let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3169            fields.push(field);
3170        }
3171        let schema = Schema::new(fields);
3172        let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3173        let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3174        let mut stream_writer =
3175            StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3176                .unwrap();
3177        let mut file_writer =
3178            FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3179
3180        let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3181        let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3182        stream_writer.flush().unwrap();
3183        file_writer.flush().unwrap();
3184        let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3185        let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3186        let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3187        // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
3188        // and then a length of 0 (4 bytes) for a total of 8 bytes.
3189        // Everything before that should have been flushed in the .flush() call.
3190        let expected_stream_flushed_bytes = stream_out.len() - 8;
3191        // A file write is the same as the stream write except for the leading magic string
3192        // ARROW1 plus padding, which is 8 bytes.
3193        let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3194
3195        assert!(
3196            stream_bytes_written_on_new < stream_bytes_written_on_flush,
3197            "this test makes no sense if flush is not actually required"
3198        );
3199        assert!(
3200            file_bytes_written_on_new < file_bytes_written_on_flush,
3201            "this test makes no sense if flush is not actually required"
3202        );
3203        assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3204        assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3205    }
3206
3207    #[test]
3208    fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3209        let l1_type =
3210            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3211        let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3212
3213        let l0_builder = Float32Builder::new();
3214        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3215            "item",
3216            DataType::Float32,
3217            false,
3218        )));
3219        let mut l2_builder =
3220            ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3221
3222        for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3223            l2_builder.values().values().append_value(point[0]);
3224            l2_builder.values().values().append_value(point[1]);
3225            l2_builder.values().values().append_value(point[2]);
3226
3227            l2_builder.values().append(true);
3228        }
3229        l2_builder.append(true);
3230
3231        let point = [10., 11., 12.];
3232        l2_builder.values().values().append_value(point[0]);
3233        l2_builder.values().values().append_value(point[1]);
3234        l2_builder.values().values().append_value(point[2]);
3235
3236        l2_builder.values().append(true);
3237        l2_builder.append(true);
3238
3239        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3240
3241        let schema = Arc::new(Schema::new_with_metadata(
3242            vec![Field::new("points", l2_type, false)],
3243            HashMap::default(),
3244        ));
3245
3246        // Test a variety of combinations that include 0 and non-zero offsets
3247        // and also portions or the rest of the array
3248        test_slices(&array, &schema, 0, 1)?;
3249        test_slices(&array, &schema, 0, 2)?;
3250        test_slices(&array, &schema, 1, 1)?;
3251
3252        Ok(())
3253    }
3254
3255    #[test]
3256    fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3257        let l0_builder = Float32Builder::new();
3258        let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3259        let mut l2_builder = ListBuilder::new(l1_builder);
3260
3261        for point in [
3262            [Some(1.0), Some(2.0), None],
3263            [Some(4.0), Some(5.0), Some(6.0)],
3264            [None, Some(8.0), Some(9.0)],
3265        ] {
3266            for p in point {
3267                match p {
3268                    Some(p) => l2_builder.values().values().append_value(p),
3269                    None => l2_builder.values().values().append_null(),
3270                }
3271            }
3272
3273            l2_builder.values().append(true);
3274        }
3275        l2_builder.append(true);
3276
3277        let point = [Some(10.), None, None];
3278        for p in point {
3279            match p {
3280                Some(p) => l2_builder.values().values().append_value(p),
3281                None => l2_builder.values().values().append_null(),
3282            }
3283        }
3284
3285        l2_builder.values().append(true);
3286        l2_builder.append(true);
3287
3288        let array = Arc::new(l2_builder.finish()) as ArrayRef;
3289
3290        let schema = Arc::new(Schema::new_with_metadata(
3291            vec![Field::new(
3292                "points",
3293                DataType::List(Arc::new(Field::new(
3294                    "item",
3295                    DataType::FixedSizeList(
3296                        Arc::new(Field::new("item", DataType::Float32, true)),
3297                        3,
3298                    ),
3299                    true,
3300                ))),
3301                true,
3302            )],
3303            HashMap::default(),
3304        ));
3305
3306        // Test a variety of combinations that include 0 and non-zero offsets
3307        // and also portions or the rest of the array
3308        test_slices(&array, &schema, 0, 1)?;
3309        test_slices(&array, &schema, 0, 2)?;
3310        test_slices(&array, &schema, 1, 1)?;
3311
3312        Ok(())
3313    }
3314
3315    fn test_slices(
3316        parent_array: &ArrayRef,
3317        schema: &SchemaRef,
3318        offset: usize,
3319        length: usize,
3320    ) -> Result<(), ArrowError> {
3321        let subarray = parent_array.slice(offset, length);
3322        let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3323
3324        let mut bytes = Vec::new();
3325        let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3326        writer.write(&original_batch)?;
3327        writer.finish()?;
3328
3329        let mut cursor = std::io::Cursor::new(bytes);
3330        let mut reader = StreamReader::try_new(&mut cursor, None)?;
3331        let returned_batch = reader.next().unwrap()?;
3332
3333        assert_eq!(original_batch, returned_batch);
3334
3335        Ok(())
3336    }
3337
3338    #[test]
3339    fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3340        let int_builder = Int64Builder::new();
3341        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3342            .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3343
3344        for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3345            fixed_list_builder.values().append_value(point[0]);
3346            fixed_list_builder.values().append_value(point[1]);
3347            fixed_list_builder.values().append_value(point[2]);
3348
3349            fixed_list_builder.append(true);
3350        }
3351
3352        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3353
3354        let schema = Arc::new(Schema::new_with_metadata(
3355            vec![Field::new(
3356                "points",
3357                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3358                false,
3359            )],
3360            HashMap::default(),
3361        ));
3362
3363        // Test a variety of combinations that include 0 and non-zero offsets
3364        // and also portions or the rest of the array
3365        test_slices(&array, &schema, 0, 4)?;
3366        test_slices(&array, &schema, 0, 2)?;
3367        test_slices(&array, &schema, 1, 3)?;
3368        test_slices(&array, &schema, 2, 1)?;
3369
3370        Ok(())
3371    }
3372
3373    #[test]
3374    fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3375        let int_builder = Int64Builder::new();
3376        let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3377
3378        for point in [
3379            [Some(1), Some(2), None],
3380            [Some(4), Some(5), Some(6)],
3381            [None, Some(8), Some(9)],
3382            [Some(10), None, None],
3383        ] {
3384            for p in point {
3385                match p {
3386                    Some(p) => fixed_list_builder.values().append_value(p),
3387                    None => fixed_list_builder.values().append_null(),
3388                }
3389            }
3390
3391            fixed_list_builder.append(true);
3392        }
3393
3394        let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3395
3396        let schema = Arc::new(Schema::new_with_metadata(
3397            vec![Field::new(
3398                "points",
3399                DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3400                true,
3401            )],
3402            HashMap::default(),
3403        ));
3404
3405        // Test a variety of combinations that include 0 and non-zero offsets
3406        // and also portions or the rest of the array
3407        test_slices(&array, &schema, 0, 4)?;
3408        test_slices(&array, &schema, 0, 2)?;
3409        test_slices(&array, &schema, 1, 3)?;
3410        test_slices(&array, &schema, 2, 1)?;
3411
3412        Ok(())
3413    }
3414
3415    #[test]
3416    fn test_metadata_encoding_ordering() {
3417        fn create_hash() -> u64 {
3418            let metadata: HashMap<String, String> = [
3419                ("a", "1"), //
3420                ("b", "2"), //
3421                ("c", "3"), //
3422                ("d", "4"), //
3423                ("e", "5"), //
3424            ]
3425            .into_iter()
3426            .map(|(k, v)| (k.to_owned(), v.to_owned()))
3427            .collect();
3428
3429            // Set metadata on both the schema and a field within it.
3430            let schema = Arc::new(
3431                Schema::new(vec![
3432                    Field::new("a", DataType::Int64, true).with_metadata(metadata.clone())
3433                ])
3434                .with_metadata(metadata)
3435                .clone(),
3436            );
3437            let batch = RecordBatch::new_empty(schema.clone());
3438
3439            let mut bytes = Vec::new();
3440            let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3441            w.write(&batch).unwrap();
3442            w.finish().unwrap();
3443
3444            let mut h = std::hash::DefaultHasher::new();
3445            h.write(&bytes);
3446            h.finish()
3447        }
3448
3449        let expected = create_hash();
3450
3451        // Since there is randomness in the HashMap and we cannot specify our
3452        // own Hasher for the implementation used for metadata, run the above
3453        // code 20x and verify it does not change. This is not perfect but it
3454        // should be good enough.
3455        let all_passed = (0..20).all(|_| create_hash() == expected);
3456        assert!(all_passed);
3457    }
3458}