parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43use crate::errors::{ParquetError, Result};
44use crate::file::metadata::{KeyValue, RowGroupMetaData};
45use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
46use crate::file::reader::{ChunkReader, Length};
47use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
48use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
49use crate::thrift::TSerializable;
50use levels::{calculate_array_levels, ArrayLevels};
51
52mod byte_array;
53mod levels;
54
55/// Encodes [`RecordBatch`] to parquet
56///
57/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
58/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
59/// flushed on close, leading the final row group in the output file to potentially
60/// contain fewer than `max_row_group_size` rows
61///
62/// ```
63/// # use std::sync::Arc;
64/// # use bytes::Bytes;
65/// # use arrow_array::{ArrayRef, Int64Array};
66/// # use arrow_array::RecordBatch;
67/// # use parquet::arrow::arrow_writer::ArrowWriter;
68/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
69/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
70/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
71///
72/// let mut buffer = Vec::new();
73/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
74/// writer.write(&to_write).unwrap();
75/// writer.close().unwrap();
76///
77/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
78/// let read = reader.next().unwrap().unwrap();
79///
80/// assert_eq!(to_write, read);
81/// ```
82///
83/// ## Memory Limiting
84///
85/// The nature of parquet forces buffering of an entire row group before it can
86/// be flushed to the underlying writer. Data is mostly buffered in its encoded
87/// form, reducing memory usage. However, some data such as dictionary keys or
88/// large strings or very nested data may still result in non-trivial memory
89/// usage.
90///
91/// See Also:
92/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
93/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
94///
95/// Call [`Self::flush`] to trigger an early flush of a row group based on a
96/// memory threshold and/or global memory pressure. However,  smaller row groups
97/// result in higher metadata overheads, and thus may worsen compression ratios
98/// and query performance.
99///
100/// ```no_run
101/// # use std::io::Write;
102/// # use arrow_array::RecordBatch;
103/// # use parquet::arrow::ArrowWriter;
104/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
105/// # let batch: RecordBatch = todo!();
106/// writer.write(&batch).unwrap();
107/// // Trigger an early flush if anticipated size exceeds 1_000_000
108/// if writer.in_progress_size() > 1_000_000 {
109///     writer.flush().unwrap();
110/// }
111/// ```
112///
113/// ## Type Support
114///
115/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
116/// Parquet types including  [`StructArray`] and [`ListArray`].
117///
118/// The following are not supported:
119///
120/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
121///
122/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
123/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
124/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
125/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
126/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
127pub struct ArrowWriter<W: Write> {
128    /// Underlying Parquet writer
129    writer: SerializedFileWriter<W>,
130
131    /// The in-progress row group if any
132    in_progress: Option<ArrowRowGroupWriter>,
133
134    /// A copy of the Arrow schema.
135    ///
136    /// The schema is used to verify that each record batch written has the correct schema
137    arrow_schema: SchemaRef,
138
139    /// The length of arrays to write to each row group
140    max_row_group_size: usize,
141}
142
143impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        let buffered_memory = self.in_progress_size();
146        f.debug_struct("ArrowWriter")
147            .field("writer", &self.writer)
148            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
149            .field("in_progress_rows", &self.in_progress_rows())
150            .field("arrow_schema", &self.arrow_schema)
151            .field("max_row_group_size", &self.max_row_group_size)
152            .finish()
153    }
154}
155
156impl<W: Write + Send> ArrowWriter<W> {
157    /// Try to create a new Arrow writer
158    ///
159    /// The writer will fail if:
160    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
161    ///  * the Arrow schema contains unsupported datatypes such as Unions
162    pub fn try_new(
163        writer: W,
164        arrow_schema: SchemaRef,
165        props: Option<WriterProperties>,
166    ) -> Result<Self> {
167        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
168        Self::try_new_with_options(writer, arrow_schema, options)
169    }
170
171    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
172    ///
173    /// The writer will fail if:
174    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
175    ///  * the Arrow schema contains unsupported datatypes such as Unions
176    pub fn try_new_with_options(
177        writer: W,
178        arrow_schema: SchemaRef,
179        options: ArrowWriterOptions,
180    ) -> Result<Self> {
181        let mut props = options.properties;
182        let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
183        if let Some(schema_root) = &options.schema_root {
184            converter = converter.schema_root(schema_root);
185        }
186        let schema = converter.convert(&arrow_schema)?;
187        if !options.skip_arrow_metadata {
188            // add serialized arrow schema
189            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
190        }
191
192        let max_row_group_size = props.max_row_group_size();
193
194        let file_writer =
195            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
196
197        Ok(Self {
198            writer: file_writer,
199            in_progress: None,
200            arrow_schema,
201            max_row_group_size,
202        })
203    }
204
205    /// Returns metadata for any flushed row groups
206    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
207        self.writer.flushed_row_groups()
208    }
209
210    /// Estimated memory usage, in bytes, of this `ArrowWriter`
211    ///
212    /// This estimate is formed bu summing the values of
213    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
214    pub fn memory_size(&self) -> usize {
215        match &self.in_progress {
216            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
217            None => 0,
218        }
219    }
220
221    /// Anticipated encoded size of the in progress row group.
222    ///
223    /// This estimate the row group size after being completely encoded is,
224    /// formed by summing the values of
225    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
226    /// columns.
227    pub fn in_progress_size(&self) -> usize {
228        match &self.in_progress {
229            Some(in_progress) => in_progress
230                .writers
231                .iter()
232                .map(|x| x.get_estimated_total_bytes())
233                .sum(),
234            None => 0,
235        }
236    }
237
238    /// Returns the number of rows buffered in the in progress row group
239    pub fn in_progress_rows(&self) -> usize {
240        self.in_progress
241            .as_ref()
242            .map(|x| x.buffered_rows)
243            .unwrap_or_default()
244    }
245
246    /// Returns the number of bytes written by this instance
247    pub fn bytes_written(&self) -> usize {
248        self.writer.bytes_written()
249    }
250
251    /// Encodes the provided [`RecordBatch`]
252    ///
253    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
254    /// rows, the contents of `batch` will be written to one or more row groups such that all but
255    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
256    ///
257    /// This will fail if the `batch`'s schema does not match the writer's schema.
258    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
259        if batch.num_rows() == 0 {
260            return Ok(());
261        }
262
263        let in_progress = match &mut self.in_progress {
264            Some(in_progress) => in_progress,
265            x => x.insert(ArrowRowGroupWriter::new(
266                self.writer.schema_descr(),
267                self.writer.properties(),
268                &self.arrow_schema,
269            )?),
270        };
271
272        // If would exceed max_row_group_size, split batch
273        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
274            let to_write = self.max_row_group_size - in_progress.buffered_rows;
275            let a = batch.slice(0, to_write);
276            let b = batch.slice(to_write, batch.num_rows() - to_write);
277            self.write(&a)?;
278            return self.write(&b);
279        }
280
281        in_progress.write(batch)?;
282
283        if in_progress.buffered_rows >= self.max_row_group_size {
284            self.flush()?
285        }
286        Ok(())
287    }
288
289    /// Flushes all buffered rows into a new row group
290    pub fn flush(&mut self) -> Result<()> {
291        let in_progress = match self.in_progress.take() {
292            Some(in_progress) => in_progress,
293            None => return Ok(()),
294        };
295
296        let mut row_group_writer = self.writer.next_row_group()?;
297        for chunk in in_progress.close()? {
298            chunk.append_to_row_group(&mut row_group_writer)?;
299        }
300        row_group_writer.close()?;
301        Ok(())
302    }
303
304    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
305    ///
306    /// This method provide a way to append kv_metadata after write RecordBatch
307    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
308        self.writer.append_key_value_metadata(kv_metadata)
309    }
310
311    /// Returns a reference to the underlying writer.
312    pub fn inner(&self) -> &W {
313        self.writer.inner()
314    }
315
316    /// Returns a mutable reference to the underlying writer.
317    ///
318    /// It is inadvisable to directly write to the underlying writer, doing so
319    /// will likely result in a corrupt parquet file
320    pub fn inner_mut(&mut self) -> &mut W {
321        self.writer.inner_mut()
322    }
323
324    /// Flushes any outstanding data and returns the underlying writer.
325    pub fn into_inner(mut self) -> Result<W> {
326        self.flush()?;
327        self.writer.into_inner()
328    }
329
330    /// Close and finalize the underlying Parquet writer
331    ///
332    /// Unlike [`Self::close`] this does not consume self
333    ///
334    /// Attempting to write after calling finish will result in an error
335    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
336        self.flush()?;
337        self.writer.finish()
338    }
339
340    /// Close and finalize the underlying Parquet writer
341    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
342        self.finish()
343    }
344}
345
346impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
347    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
348        self.write(batch).map_err(|e| e.into())
349    }
350
351    fn close(self) -> std::result::Result<(), ArrowError> {
352        self.close()?;
353        Ok(())
354    }
355}
356
357/// Arrow-specific configuration settings for writing parquet files.
358///
359/// See [`ArrowWriter`] for how to configure the writer.
360#[derive(Debug, Clone, Default)]
361pub struct ArrowWriterOptions {
362    properties: WriterProperties,
363    skip_arrow_metadata: bool,
364    schema_root: Option<String>,
365}
366
367impl ArrowWriterOptions {
368    /// Creates a new [`ArrowWriterOptions`] with the default settings.
369    pub fn new() -> Self {
370        Self::default()
371    }
372
373    /// Sets the [`WriterProperties`] for writing parquet files.
374    pub fn with_properties(self, properties: WriterProperties) -> Self {
375        Self { properties, ..self }
376    }
377
378    /// Skip encoding the embedded arrow metadata (defaults to `false`)
379    ///
380    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
381    /// by default.
382    ///
383    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
384    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
385        Self {
386            skip_arrow_metadata,
387            ..self
388        }
389    }
390
391    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
392    pub fn with_schema_root(self, schema_root: String) -> Self {
393        Self {
394            schema_root: Some(schema_root),
395            ..self
396        }
397    }
398}
399
400/// A single column chunk produced by [`ArrowColumnWriter`]
401#[derive(Default)]
402struct ArrowColumnChunkData {
403    length: usize,
404    data: Vec<Bytes>,
405}
406
407impl Length for ArrowColumnChunkData {
408    fn len(&self) -> u64 {
409        self.length as _
410    }
411}
412
413impl ChunkReader for ArrowColumnChunkData {
414    type T = ArrowColumnChunkReader;
415
416    fn get_read(&self, start: u64) -> Result<Self::T> {
417        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
418        Ok(ArrowColumnChunkReader(
419            self.data.clone().into_iter().peekable(),
420        ))
421    }
422
423    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
424        unimplemented!()
425    }
426}
427
428/// A [`Read`] for [`ArrowColumnChunkData`]
429struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
430
431impl Read for ArrowColumnChunkReader {
432    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
433        let buffer = loop {
434            match self.0.peek_mut() {
435                Some(b) if b.is_empty() => {
436                    self.0.next();
437                    continue;
438                }
439                Some(b) => break b,
440                None => return Ok(0),
441            }
442        };
443
444        let len = buffer.len().min(out.len());
445        let b = buffer.split_to(len);
446        out[..len].copy_from_slice(&b);
447        Ok(len)
448    }
449}
450
451/// A shared [`ArrowColumnChunkData`]
452///
453/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
454/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
455type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
456
457#[derive(Default)]
458struct ArrowPageWriter {
459    buffer: SharedColumnChunk,
460}
461
462impl PageWriter for ArrowPageWriter {
463    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
464        let mut buf = self.buffer.try_lock().unwrap();
465        let page_header = page.to_thrift_header();
466        let header = {
467            let mut header = Vec::with_capacity(1024);
468            let mut protocol = TCompactOutputProtocol::new(&mut header);
469            page_header.write_to_out_protocol(&mut protocol)?;
470            Bytes::from(header)
471        };
472
473        let data = page.compressed_page().buffer().clone();
474        let compressed_size = data.len() + header.len();
475
476        let mut spec = PageWriteSpec::new();
477        spec.page_type = page.page_type();
478        spec.num_values = page.num_values();
479        spec.uncompressed_size = page.uncompressed_size() + header.len();
480        spec.offset = buf.length as u64;
481        spec.compressed_size = compressed_size;
482        spec.bytes_written = compressed_size as u64;
483
484        buf.length += compressed_size;
485        buf.data.push(header);
486        buf.data.push(data);
487
488        Ok(spec)
489    }
490
491    fn close(&mut self) -> Result<()> {
492        Ok(())
493    }
494}
495
496/// A leaf column that can be encoded by [`ArrowColumnWriter`]
497#[derive(Debug)]
498pub struct ArrowLeafColumn(ArrayLevels);
499
500/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
501pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
502    let levels = calculate_array_levels(array, field)?;
503    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
504}
505
506/// The data for a single column chunk, see [`ArrowColumnWriter`]
507pub struct ArrowColumnChunk {
508    data: ArrowColumnChunkData,
509    close: ColumnCloseResult,
510}
511
512impl std::fmt::Debug for ArrowColumnChunk {
513    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514        f.debug_struct("ArrowColumnChunk")
515            .field("length", &self.data.length)
516            .finish_non_exhaustive()
517    }
518}
519
520impl ArrowColumnChunk {
521    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
522    pub fn append_to_row_group<W: Write + Send>(
523        self,
524        writer: &mut SerializedRowGroupWriter<'_, W>,
525    ) -> Result<()> {
526        writer.append_column(&self.data, self.close)
527    }
528}
529
530/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
531///
532/// Note: This is a low-level interface for applications that require fine-grained control
533/// of encoding, see [`ArrowWriter`] for a higher-level interface
534///
535/// ```
536/// // The arrow schema
537/// # use std::sync::Arc;
538/// # use arrow_array::*;
539/// # use arrow_schema::*;
540/// # use parquet::arrow::ArrowSchemaConverter;
541/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
542/// # use parquet::file::properties::WriterProperties;
543/// # use parquet::file::writer::SerializedFileWriter;
544/// #
545/// let schema = Arc::new(Schema::new(vec![
546///     Field::new("i32", DataType::Int32, false),
547///     Field::new("f32", DataType::Float32, false),
548/// ]));
549///
550/// // Compute the parquet schema
551/// let props = Arc::new(WriterProperties::default());
552/// let parquet_schema = ArrowSchemaConverter::new()
553///   .with_coerce_types(props.coerce_types())
554///   .convert(&schema)
555///   .unwrap();
556///
557/// // Create writers for each of the leaf columns
558/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
559///
560/// // Spawn a worker thread for each column
561/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better
562/// let mut workers: Vec<_> = col_writers
563///     .into_iter()
564///     .map(|mut col_writer| {
565///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
566///         let handle = std::thread::spawn(move || {
567///             for col in recv {
568///                 col_writer.write(&col)?;
569///             }
570///             col_writer.close()
571///         });
572///         (handle, send)
573///     })
574///     .collect();
575///
576/// // Create parquet writer
577/// let root_schema = parquet_schema.root_schema_ptr();
578/// let mut out = Vec::with_capacity(1024); // This could be a File
579/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap();
580///
581/// // Start row group
582/// let mut row_group = writer.next_row_group().unwrap();
583///
584/// // Columns to encode
585/// let to_write = vec![
586///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
587///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
588/// ];
589///
590/// // Spawn work to encode columns
591/// let mut worker_iter = workers.iter_mut();
592/// for (arr, field) in to_write.iter().zip(&schema.fields) {
593///     for leaves in compute_leaves(field, arr).unwrap() {
594///         worker_iter.next().unwrap().1.send(leaves).unwrap();
595///     }
596/// }
597///
598/// // Finish up parallel column encoding
599/// for (handle, send) in workers {
600///     drop(send); // Drop send side to signal termination
601///     let chunk = handle.join().unwrap().unwrap();
602///     chunk.append_to_row_group(&mut row_group).unwrap();
603/// }
604/// row_group.close().unwrap();
605///
606/// let metadata = writer.close().unwrap();
607/// assert_eq!(metadata.num_rows, 3);
608/// ```
609pub struct ArrowColumnWriter {
610    writer: ArrowColumnWriterImpl,
611    chunk: SharedColumnChunk,
612}
613
614impl std::fmt::Debug for ArrowColumnWriter {
615    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
617    }
618}
619
620enum ArrowColumnWriterImpl {
621    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
622    Column(ColumnWriter<'static>),
623}
624
625impl ArrowColumnWriter {
626    /// Write an [`ArrowLeafColumn`]
627    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
628        match &mut self.writer {
629            ArrowColumnWriterImpl::Column(c) => {
630                write_leaf(c, &col.0)?;
631            }
632            ArrowColumnWriterImpl::ByteArray(c) => {
633                write_primitive(c, col.0.array().as_ref(), &col.0)?;
634            }
635        }
636        Ok(())
637    }
638
639    /// Close this column returning the written [`ArrowColumnChunk`]
640    pub fn close(self) -> Result<ArrowColumnChunk> {
641        let close = match self.writer {
642            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
643            ArrowColumnWriterImpl::Column(c) => c.close()?,
644        };
645        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
646        let data = chunk.into_inner().unwrap();
647        Ok(ArrowColumnChunk { data, close })
648    }
649
650    /// Returns the estimated total memory usage by the writer.
651    ///
652    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
653    /// of the current memory usage and not it's anticipated encoded size.
654    ///
655    /// This includes:
656    /// 1. Data buffered in encoded form
657    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
658    ///
659    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
660    pub fn memory_size(&self) -> usize {
661        match &self.writer {
662            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
663            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
664        }
665    }
666
667    /// Returns the estimated total encoded bytes for this column writer.
668    ///
669    /// This includes:
670    /// 1. Data buffered in encoded form
671    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
672    ///
673    /// This value should be less than or equal to [`Self::memory_size`]
674    pub fn get_estimated_total_bytes(&self) -> usize {
675        match &self.writer {
676            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
677            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
678        }
679    }
680}
681
682/// Encodes [`RecordBatch`] to a parquet row group
683struct ArrowRowGroupWriter {
684    writers: Vec<ArrowColumnWriter>,
685    schema: SchemaRef,
686    buffered_rows: usize,
687}
688
689impl ArrowRowGroupWriter {
690    fn new(
691        parquet: &SchemaDescriptor,
692        props: &WriterPropertiesPtr,
693        arrow: &SchemaRef,
694    ) -> Result<Self> {
695        let writers = get_column_writers(parquet, props, arrow)?;
696        Ok(Self {
697            writers,
698            schema: arrow.clone(),
699            buffered_rows: 0,
700        })
701    }
702
703    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
704        self.buffered_rows += batch.num_rows();
705        let mut writers = self.writers.iter_mut();
706        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
707            for leaf in compute_leaves(field.as_ref(), column)? {
708                writers.next().unwrap().write(&leaf)?
709            }
710        }
711        Ok(())
712    }
713
714    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
715        self.writers
716            .into_iter()
717            .map(|writer| writer.close())
718            .collect()
719    }
720}
721
722/// Returns the [`ArrowColumnWriter`] for a given schema
723pub fn get_column_writers(
724    parquet: &SchemaDescriptor,
725    props: &WriterPropertiesPtr,
726    arrow: &SchemaRef,
727) -> Result<Vec<ArrowColumnWriter>> {
728    let mut writers = Vec::with_capacity(arrow.fields.len());
729    let mut leaves = parquet.columns().iter();
730    for field in &arrow.fields {
731        get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?;
732    }
733    Ok(writers)
734}
735
736/// Gets the [`ArrowColumnWriter`] for the given `data_type`
737fn get_arrow_column_writer(
738    data_type: &ArrowDataType,
739    props: &WriterPropertiesPtr,
740    leaves: &mut Iter<'_, ColumnDescPtr>,
741    out: &mut Vec<ArrowColumnWriter>,
742) -> Result<()> {
743    let col = |desc: &ColumnDescPtr| {
744        let page_writer = Box::<ArrowPageWriter>::default();
745        let chunk = page_writer.buffer.clone();
746        let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
747        ArrowColumnWriter {
748            chunk,
749            writer: ArrowColumnWriterImpl::Column(writer),
750        }
751    };
752
753    let bytes = |desc: &ColumnDescPtr| {
754        let page_writer = Box::<ArrowPageWriter>::default();
755        let chunk = page_writer.buffer.clone();
756        let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
757        ArrowColumnWriter {
758            chunk,
759            writer: ArrowColumnWriterImpl::ByteArray(writer),
760        }
761    };
762
763    match data_type {
764        _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())),
765        ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())),
766        ArrowDataType::LargeBinary
767        | ArrowDataType::Binary
768        | ArrowDataType::Utf8
769        | ArrowDataType::LargeUtf8
770        | ArrowDataType::BinaryView
771        | ArrowDataType::Utf8View => {
772            out.push(bytes(leaves.next().unwrap()))
773        }
774        ArrowDataType::List(f)
775        | ArrowDataType::LargeList(f)
776        | ArrowDataType::FixedSizeList(f, _) => {
777            get_arrow_column_writer(f.data_type(), props, leaves, out)?
778        }
779        ArrowDataType::Struct(fields) => {
780            for field in fields {
781                get_arrow_column_writer(field.data_type(), props, leaves, out)?
782            }
783        }
784        ArrowDataType::Map(f, _) => match f.data_type() {
785            ArrowDataType::Struct(f) => {
786                get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
787                get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
788            }
789            _ => unreachable!("invalid map type"),
790        }
791        ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
792            ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
793                out.push(bytes(leaves.next().unwrap()))
794            }
795            ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
796                out.push(bytes(leaves.next().unwrap()))
797            }
798            _ => {
799                out.push(col(leaves.next().unwrap()))
800            }
801        }
802       _ => return Err(ParquetError::NYI(
803           format!(
804               "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
805           )
806       ))
807    }
808    Ok(())
809}
810
811fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
812    let column = levels.array().as_ref();
813    let indices = levels.non_null_indices();
814    match writer {
815        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
816            match column.data_type() {
817                ArrowDataType::Date64 => {
818                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
819                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
820                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
821
822                    let array = array.as_primitive::<Int32Type>();
823                    write_primitive(typed, array.values(), levels)
824                }
825                ArrowDataType::UInt32 => {
826                    let values = column.as_primitive::<UInt32Type>().values();
827                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
828                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
829                    let array = values.inner().typed_data::<i32>();
830                    write_primitive(typed, array, levels)
831                }
832                ArrowDataType::Decimal128(_, _) => {
833                    // use the int32 to represent the decimal with low precision
834                    let array = column
835                        .as_primitive::<Decimal128Type>()
836                        .unary::<_, Int32Type>(|v| v as i32);
837                    write_primitive(typed, array.values(), levels)
838                }
839                ArrowDataType::Decimal256(_, _) => {
840                    // use the int32 to represent the decimal with low precision
841                    let array = column
842                        .as_primitive::<Decimal256Type>()
843                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
844                    write_primitive(typed, array.values(), levels)
845                }
846                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
847                    ArrowDataType::Decimal128(_, _) => {
848                        let array = arrow_cast::cast(column, value_type)?;
849                        let array = array
850                            .as_primitive::<Decimal128Type>()
851                            .unary::<_, Int32Type>(|v| v as i32);
852                        write_primitive(typed, array.values(), levels)
853                    }
854                    ArrowDataType::Decimal256(_, _) => {
855                        let array = arrow_cast::cast(column, value_type)?;
856                        let array = array
857                            .as_primitive::<Decimal256Type>()
858                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
859                        write_primitive(typed, array.values(), levels)
860                    }
861                    _ => {
862                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
863                        let array = array.as_primitive::<Int32Type>();
864                        write_primitive(typed, array.values(), levels)
865                    }
866                },
867                _ => {
868                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
869                    let array = array.as_primitive::<Int32Type>();
870                    write_primitive(typed, array.values(), levels)
871                }
872            }
873        }
874        ColumnWriter::BoolColumnWriter(ref mut typed) => {
875            let array = column.as_boolean();
876            typed.write_batch(
877                get_bool_array_slice(array, indices).as_slice(),
878                levels.def_levels(),
879                levels.rep_levels(),
880            )
881        }
882        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
883            match column.data_type() {
884                ArrowDataType::Date64 => {
885                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
886
887                    let array = array.as_primitive::<Int64Type>();
888                    write_primitive(typed, array.values(), levels)
889                }
890                ArrowDataType::Int64 => {
891                    let array = column.as_primitive::<Int64Type>();
892                    write_primitive(typed, array.values(), levels)
893                }
894                ArrowDataType::UInt64 => {
895                    let values = column.as_primitive::<UInt64Type>().values();
896                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
897                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
898                    let array = values.inner().typed_data::<i64>();
899                    write_primitive(typed, array, levels)
900                }
901                ArrowDataType::Decimal128(_, _) => {
902                    // use the int64 to represent the decimal with low precision
903                    let array = column
904                        .as_primitive::<Decimal128Type>()
905                        .unary::<_, Int64Type>(|v| v as i64);
906                    write_primitive(typed, array.values(), levels)
907                }
908                ArrowDataType::Decimal256(_, _) => {
909                    // use the int64 to represent the decimal with low precision
910                    let array = column
911                        .as_primitive::<Decimal256Type>()
912                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
913                    write_primitive(typed, array.values(), levels)
914                }
915                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
916                    ArrowDataType::Decimal128(_, _) => {
917                        let array = arrow_cast::cast(column, value_type)?;
918                        let array = array
919                            .as_primitive::<Decimal128Type>()
920                            .unary::<_, Int64Type>(|v| v as i64);
921                        write_primitive(typed, array.values(), levels)
922                    }
923                    ArrowDataType::Decimal256(_, _) => {
924                        let array = arrow_cast::cast(column, value_type)?;
925                        let array = array
926                            .as_primitive::<Decimal256Type>()
927                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
928                        write_primitive(typed, array.values(), levels)
929                    }
930                    _ => {
931                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
932                        let array = array.as_primitive::<Int64Type>();
933                        write_primitive(typed, array.values(), levels)
934                    }
935                },
936                _ => {
937                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
938                    let array = array.as_primitive::<Int64Type>();
939                    write_primitive(typed, array.values(), levels)
940                }
941            }
942        }
943        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
944            unreachable!("Currently unreachable because data type not supported")
945        }
946        ColumnWriter::FloatColumnWriter(ref mut typed) => {
947            let array = column.as_primitive::<Float32Type>();
948            write_primitive(typed, array.values(), levels)
949        }
950        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
951            let array = column.as_primitive::<Float64Type>();
952            write_primitive(typed, array.values(), levels)
953        }
954        ColumnWriter::ByteArrayColumnWriter(_) => {
955            unreachable!("should use ByteArrayWriter")
956        }
957        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
958            let bytes = match column.data_type() {
959                ArrowDataType::Interval(interval_unit) => match interval_unit {
960                    IntervalUnit::YearMonth => {
961                        let array = column
962                            .as_any()
963                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
964                            .unwrap();
965                        get_interval_ym_array_slice(array, indices)
966                    }
967                    IntervalUnit::DayTime => {
968                        let array = column
969                            .as_any()
970                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
971                            .unwrap();
972                        get_interval_dt_array_slice(array, indices)
973                    }
974                    _ => {
975                        return Err(ParquetError::NYI(
976                            format!(
977                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
978                            )
979                        ));
980                    }
981                },
982                ArrowDataType::FixedSizeBinary(_) => {
983                    let array = column
984                        .as_any()
985                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
986                        .unwrap();
987                    get_fsb_array_slice(array, indices)
988                }
989                ArrowDataType::Decimal128(_, _) => {
990                    let array = column.as_primitive::<Decimal128Type>();
991                    get_decimal_128_array_slice(array, indices)
992                }
993                ArrowDataType::Decimal256(_, _) => {
994                    let array = column
995                        .as_any()
996                        .downcast_ref::<arrow_array::Decimal256Array>()
997                        .unwrap();
998                    get_decimal_256_array_slice(array, indices)
999                }
1000                ArrowDataType::Float16 => {
1001                    let array = column.as_primitive::<Float16Type>();
1002                    get_float_16_array_slice(array, indices)
1003                }
1004                _ => {
1005                    return Err(ParquetError::NYI(
1006                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1007                    ));
1008                }
1009            };
1010            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1011        }
1012    }
1013}
1014
1015fn write_primitive<E: ColumnValueEncoder>(
1016    writer: &mut GenericColumnWriter<E>,
1017    values: &E::Values,
1018    levels: &ArrayLevels,
1019) -> Result<usize> {
1020    writer.write_batch_internal(
1021        values,
1022        Some(levels.non_null_indices()),
1023        levels.def_levels(),
1024        levels.rep_levels(),
1025        None,
1026        None,
1027        None,
1028    )
1029}
1030
1031fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1032    let mut values = Vec::with_capacity(indices.len());
1033    for i in indices {
1034        values.push(array.value(*i))
1035    }
1036    values
1037}
1038
1039/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1040/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1041fn get_interval_ym_array_slice(
1042    array: &arrow_array::IntervalYearMonthArray,
1043    indices: &[usize],
1044) -> Vec<FixedLenByteArray> {
1045    let mut values = Vec::with_capacity(indices.len());
1046    for i in indices {
1047        let mut value = array.value(*i).to_le_bytes().to_vec();
1048        let mut suffix = vec![0; 8];
1049        value.append(&mut suffix);
1050        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1051    }
1052    values
1053}
1054
1055/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1056/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1057fn get_interval_dt_array_slice(
1058    array: &arrow_array::IntervalDayTimeArray,
1059    indices: &[usize],
1060) -> Vec<FixedLenByteArray> {
1061    let mut values = Vec::with_capacity(indices.len());
1062    for i in indices {
1063        let mut out = [0; 12];
1064        let value = array.value(*i);
1065        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1066        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1067        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1068    }
1069    values
1070}
1071
1072fn get_decimal_128_array_slice(
1073    array: &arrow_array::Decimal128Array,
1074    indices: &[usize],
1075) -> Vec<FixedLenByteArray> {
1076    let mut values = Vec::with_capacity(indices.len());
1077    let size = decimal_length_from_precision(array.precision());
1078    for i in indices {
1079        let as_be_bytes = array.value(*i).to_be_bytes();
1080        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1081        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1082    }
1083    values
1084}
1085
1086fn get_decimal_256_array_slice(
1087    array: &arrow_array::Decimal256Array,
1088    indices: &[usize],
1089) -> Vec<FixedLenByteArray> {
1090    let mut values = Vec::with_capacity(indices.len());
1091    let size = decimal_length_from_precision(array.precision());
1092    for i in indices {
1093        let as_be_bytes = array.value(*i).to_be_bytes();
1094        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1095        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1096    }
1097    values
1098}
1099
1100fn get_float_16_array_slice(
1101    array: &arrow_array::Float16Array,
1102    indices: &[usize],
1103) -> Vec<FixedLenByteArray> {
1104    let mut values = Vec::with_capacity(indices.len());
1105    for i in indices {
1106        let value = array.value(*i).to_le_bytes().to_vec();
1107        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1108    }
1109    values
1110}
1111
1112fn get_fsb_array_slice(
1113    array: &arrow_array::FixedSizeBinaryArray,
1114    indices: &[usize],
1115) -> Vec<FixedLenByteArray> {
1116    let mut values = Vec::with_capacity(indices.len());
1117    for i in indices {
1118        let value = array.value(*i).to_vec();
1119        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1120    }
1121    values
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126    use super::*;
1127
1128    use std::fs::File;
1129
1130    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1131    use crate::arrow::ARROW_SCHEMA_META_KEY;
1132    use arrow::datatypes::ToByteSlice;
1133    use arrow::datatypes::{DataType, Schema};
1134    use arrow::error::Result as ArrowResult;
1135    use arrow::util::data_gen::create_random_array;
1136    use arrow::util::pretty::pretty_format_batches;
1137    use arrow::{array::*, buffer::Buffer};
1138    use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1139    use arrow_schema::Fields;
1140    use half::f16;
1141
1142    use crate::basic::Encoding;
1143    use crate::data_type::AsBytes;
1144    use crate::file::metadata::ParquetMetaData;
1145    use crate::file::page_index::index::Index;
1146    use crate::file::page_index::index_reader::read_offset_indexes;
1147    use crate::file::properties::{
1148        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1149    };
1150    use crate::file::serialized_reader::ReadOptionsBuilder;
1151    use crate::file::{
1152        reader::{FileReader, SerializedFileReader},
1153        statistics::Statistics,
1154    };
1155
1156    #[test]
1157    fn arrow_writer() {
1158        // define schema
1159        let schema = Schema::new(vec![
1160            Field::new("a", DataType::Int32, false),
1161            Field::new("b", DataType::Int32, true),
1162        ]);
1163
1164        // create some data
1165        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1166        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1167
1168        // build a record batch
1169        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1170
1171        roundtrip(batch, Some(SMALL_SIZE / 2));
1172    }
1173
1174    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1175        let mut buffer = vec![];
1176
1177        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1178        writer.write(expected_batch).unwrap();
1179        writer.close().unwrap();
1180
1181        buffer
1182    }
1183
1184    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1185        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1186        writer.write(expected_batch).unwrap();
1187        writer.into_inner().unwrap()
1188    }
1189
1190    #[test]
1191    fn roundtrip_bytes() {
1192        // define schema
1193        let schema = Arc::new(Schema::new(vec![
1194            Field::new("a", DataType::Int32, false),
1195            Field::new("b", DataType::Int32, true),
1196        ]));
1197
1198        // create some data
1199        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1200        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1201
1202        // build a record batch
1203        let expected_batch =
1204            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1205
1206        for buffer in [
1207            get_bytes_after_close(schema.clone(), &expected_batch),
1208            get_bytes_by_into_inner(schema, &expected_batch),
1209        ] {
1210            let cursor = Bytes::from(buffer);
1211            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1212
1213            let actual_batch = record_batch_reader
1214                .next()
1215                .expect("No batch found")
1216                .expect("Unable to get batch");
1217
1218            assert_eq!(expected_batch.schema(), actual_batch.schema());
1219            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1220            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1221            for i in 0..expected_batch.num_columns() {
1222                let expected_data = expected_batch.column(i).to_data();
1223                let actual_data = actual_batch.column(i).to_data();
1224
1225                assert_eq!(expected_data, actual_data);
1226            }
1227        }
1228    }
1229
1230    #[test]
1231    fn arrow_writer_non_null() {
1232        // define schema
1233        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1234
1235        // create some data
1236        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1237
1238        // build a record batch
1239        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1240
1241        roundtrip(batch, Some(SMALL_SIZE / 2));
1242    }
1243
1244    #[test]
1245    fn arrow_writer_list() {
1246        // define schema
1247        let schema = Schema::new(vec![Field::new(
1248            "a",
1249            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1250            true,
1251        )]);
1252
1253        // create some data
1254        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1255
1256        // Construct a buffer for value offsets, for the nested array:
1257        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1258        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1259
1260        // Construct a list array from the above two
1261        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1262            DataType::Int32,
1263            false,
1264        ))))
1265        .len(5)
1266        .add_buffer(a_value_offsets)
1267        .add_child_data(a_values.into_data())
1268        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1269        .build()
1270        .unwrap();
1271        let a = ListArray::from(a_list_data);
1272
1273        // build a record batch
1274        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1275
1276        assert_eq!(batch.column(0).null_count(), 1);
1277
1278        // This test fails if the max row group size is less than the batch's length
1279        // see https://github.com/apache/arrow-rs/issues/518
1280        roundtrip(batch, None);
1281    }
1282
1283    #[test]
1284    fn arrow_writer_list_non_null() {
1285        // define schema
1286        let schema = Schema::new(vec![Field::new(
1287            "a",
1288            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1289            false,
1290        )]);
1291
1292        // create some data
1293        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1294
1295        // Construct a buffer for value offsets, for the nested array:
1296        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1297        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1298
1299        // Construct a list array from the above two
1300        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1301            DataType::Int32,
1302            false,
1303        ))))
1304        .len(5)
1305        .add_buffer(a_value_offsets)
1306        .add_child_data(a_values.into_data())
1307        .build()
1308        .unwrap();
1309        let a = ListArray::from(a_list_data);
1310
1311        // build a record batch
1312        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1313
1314        // This test fails if the max row group size is less than the batch's length
1315        // see https://github.com/apache/arrow-rs/issues/518
1316        assert_eq!(batch.column(0).null_count(), 0);
1317
1318        roundtrip(batch, None);
1319    }
1320
1321    #[test]
1322    fn arrow_writer_binary() {
1323        let string_field = Field::new("a", DataType::Utf8, false);
1324        let binary_field = Field::new("b", DataType::Binary, false);
1325        let schema = Schema::new(vec![string_field, binary_field]);
1326
1327        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1328        let raw_binary_values = [
1329            b"foo".to_vec(),
1330            b"bar".to_vec(),
1331            b"baz".to_vec(),
1332            b"quux".to_vec(),
1333        ];
1334        let raw_binary_value_refs = raw_binary_values
1335            .iter()
1336            .map(|x| x.as_slice())
1337            .collect::<Vec<_>>();
1338
1339        let string_values = StringArray::from(raw_string_values.clone());
1340        let binary_values = BinaryArray::from(raw_binary_value_refs);
1341        let batch = RecordBatch::try_new(
1342            Arc::new(schema),
1343            vec![Arc::new(string_values), Arc::new(binary_values)],
1344        )
1345        .unwrap();
1346
1347        roundtrip(batch, Some(SMALL_SIZE / 2));
1348    }
1349
1350    #[test]
1351    fn arrow_writer_binary_view() {
1352        let string_field = Field::new("a", DataType::Utf8View, false);
1353        let binary_field = Field::new("b", DataType::BinaryView, false);
1354        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1355        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1356
1357        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1358        let raw_binary_values = vec![
1359            b"foo".to_vec(),
1360            b"bar".to_vec(),
1361            b"large payload over 12 bytes".to_vec(),
1362            b"lulu".to_vec(),
1363        ];
1364        let nullable_string_values =
1365            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1366
1367        let string_view_values = StringViewArray::from(raw_string_values);
1368        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1369        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1370        let batch = RecordBatch::try_new(
1371            Arc::new(schema),
1372            vec![
1373                Arc::new(string_view_values),
1374                Arc::new(binary_view_values),
1375                Arc::new(nullable_string_view_values),
1376            ],
1377        )
1378        .unwrap();
1379
1380        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1381        roundtrip(batch, None);
1382    }
1383
1384    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1385        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1386        let schema = Schema::new(vec![decimal_field]);
1387
1388        let decimal_values = vec![10_000, 50_000, 0, -100]
1389            .into_iter()
1390            .map(Some)
1391            .collect::<Decimal128Array>()
1392            .with_precision_and_scale(precision, scale)
1393            .unwrap();
1394
1395        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1396    }
1397
1398    #[test]
1399    fn arrow_writer_decimal() {
1400        // int32 to store the decimal value
1401        let batch_int32_decimal = get_decimal_batch(5, 2);
1402        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1403        // int64 to store the decimal value
1404        let batch_int64_decimal = get_decimal_batch(12, 2);
1405        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1406        // fixed_length_byte_array to store the decimal value
1407        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1408        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1409    }
1410
1411    #[test]
1412    fn arrow_writer_complex() {
1413        // define schema
1414        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1415        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1416        let struct_field_g = Arc::new(Field::new_list(
1417            "g",
1418            Field::new_list_field(DataType::Int16, true),
1419            false,
1420        ));
1421        let struct_field_h = Arc::new(Field::new_list(
1422            "h",
1423            Field::new_list_field(DataType::Int16, false),
1424            true,
1425        ));
1426        let struct_field_e = Arc::new(Field::new_struct(
1427            "e",
1428            vec![
1429                struct_field_f.clone(),
1430                struct_field_g.clone(),
1431                struct_field_h.clone(),
1432            ],
1433            false,
1434        ));
1435        let schema = Schema::new(vec![
1436            Field::new("a", DataType::Int32, false),
1437            Field::new("b", DataType::Int32, true),
1438            Field::new_struct(
1439                "c",
1440                vec![struct_field_d.clone(), struct_field_e.clone()],
1441                false,
1442            ),
1443        ]);
1444
1445        // create some data
1446        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1447        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1448        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1449        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1450
1451        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1452
1453        // Construct a buffer for value offsets, for the nested array:
1454        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1455        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1456
1457        // Construct a list array from the above two
1458        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1459            .len(5)
1460            .add_buffer(g_value_offsets.clone())
1461            .add_child_data(g_value.to_data())
1462            .build()
1463            .unwrap();
1464        let g = ListArray::from(g_list_data);
1465        // The difference between g and h is that h has a null bitmap
1466        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1467            .len(5)
1468            .add_buffer(g_value_offsets)
1469            .add_child_data(g_value.to_data())
1470            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1471            .build()
1472            .unwrap();
1473        let h = ListArray::from(h_list_data);
1474
1475        let e = StructArray::from(vec![
1476            (struct_field_f, Arc::new(f) as ArrayRef),
1477            (struct_field_g, Arc::new(g) as ArrayRef),
1478            (struct_field_h, Arc::new(h) as ArrayRef),
1479        ]);
1480
1481        let c = StructArray::from(vec![
1482            (struct_field_d, Arc::new(d) as ArrayRef),
1483            (struct_field_e, Arc::new(e) as ArrayRef),
1484        ]);
1485
1486        // build a record batch
1487        let batch = RecordBatch::try_new(
1488            Arc::new(schema),
1489            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1490        )
1491        .unwrap();
1492
1493        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1494        roundtrip(batch, Some(SMALL_SIZE / 3));
1495    }
1496
1497    #[test]
1498    fn arrow_writer_complex_mixed() {
1499        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1500        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1501
1502        // define schema
1503        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1504        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1505        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1506        let schema = Schema::new(vec![Field::new(
1507            "some_nested_object",
1508            DataType::Struct(Fields::from(vec![
1509                offset_field.clone(),
1510                partition_field.clone(),
1511                topic_field.clone(),
1512            ])),
1513            false,
1514        )]);
1515
1516        // create some data
1517        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1518        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1519        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1520
1521        let some_nested_object = StructArray::from(vec![
1522            (offset_field, Arc::new(offset) as ArrayRef),
1523            (partition_field, Arc::new(partition) as ArrayRef),
1524            (topic_field, Arc::new(topic) as ArrayRef),
1525        ]);
1526
1527        // build a record batch
1528        let batch =
1529            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1530
1531        roundtrip(batch, Some(SMALL_SIZE / 2));
1532    }
1533
1534    #[test]
1535    fn arrow_writer_map() {
1536        // Note: we are using the JSON Arrow reader for brevity
1537        let json_content = r#"
1538        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1539        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1540        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1541        "#;
1542        let entries_struct_type = DataType::Struct(Fields::from(vec![
1543            Field::new("key", DataType::Utf8, false),
1544            Field::new("value", DataType::Utf8, true),
1545        ]));
1546        let stocks_field = Field::new(
1547            "stocks",
1548            DataType::Map(
1549                Arc::new(Field::new("entries", entries_struct_type, false)),
1550                false,
1551            ),
1552            true,
1553        );
1554        let schema = Arc::new(Schema::new(vec![stocks_field]));
1555        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1556        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1557
1558        let batch = reader.next().unwrap().unwrap();
1559        roundtrip(batch, None);
1560    }
1561
1562    #[test]
1563    fn arrow_writer_2_level_struct() {
1564        // tests writing <struct<struct<primitive>>
1565        let field_c = Field::new("c", DataType::Int32, true);
1566        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1567        let type_a = DataType::Struct(vec![field_b.clone()].into());
1568        let field_a = Field::new("a", type_a, true);
1569        let schema = Schema::new(vec![field_a.clone()]);
1570
1571        // create data
1572        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1573        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1574            .len(6)
1575            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1576            .add_child_data(c.into_data())
1577            .build()
1578            .unwrap();
1579        let b = StructArray::from(b_data);
1580        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1581            .len(6)
1582            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1583            .add_child_data(b.into_data())
1584            .build()
1585            .unwrap();
1586        let a = StructArray::from(a_data);
1587
1588        assert_eq!(a.null_count(), 1);
1589        assert_eq!(a.column(0).null_count(), 2);
1590
1591        // build a racord batch
1592        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1593
1594        roundtrip(batch, Some(SMALL_SIZE / 2));
1595    }
1596
1597    #[test]
1598    fn arrow_writer_2_level_struct_non_null() {
1599        // tests writing <struct<struct<primitive>>
1600        let field_c = Field::new("c", DataType::Int32, false);
1601        let type_b = DataType::Struct(vec![field_c].into());
1602        let field_b = Field::new("b", type_b.clone(), false);
1603        let type_a = DataType::Struct(vec![field_b].into());
1604        let field_a = Field::new("a", type_a.clone(), false);
1605        let schema = Schema::new(vec![field_a]);
1606
1607        // create data
1608        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1609        let b_data = ArrayDataBuilder::new(type_b)
1610            .len(6)
1611            .add_child_data(c.into_data())
1612            .build()
1613            .unwrap();
1614        let b = StructArray::from(b_data);
1615        let a_data = ArrayDataBuilder::new(type_a)
1616            .len(6)
1617            .add_child_data(b.into_data())
1618            .build()
1619            .unwrap();
1620        let a = StructArray::from(a_data);
1621
1622        assert_eq!(a.null_count(), 0);
1623        assert_eq!(a.column(0).null_count(), 0);
1624
1625        // build a racord batch
1626        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1627
1628        roundtrip(batch, Some(SMALL_SIZE / 2));
1629    }
1630
1631    #[test]
1632    fn arrow_writer_2_level_struct_mixed_null() {
1633        // tests writing <struct<struct<primitive>>
1634        let field_c = Field::new("c", DataType::Int32, false);
1635        let type_b = DataType::Struct(vec![field_c].into());
1636        let field_b = Field::new("b", type_b.clone(), true);
1637        let type_a = DataType::Struct(vec![field_b].into());
1638        let field_a = Field::new("a", type_a.clone(), false);
1639        let schema = Schema::new(vec![field_a]);
1640
1641        // create data
1642        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1643        let b_data = ArrayDataBuilder::new(type_b)
1644            .len(6)
1645            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1646            .add_child_data(c.into_data())
1647            .build()
1648            .unwrap();
1649        let b = StructArray::from(b_data);
1650        // a intentionally has no null buffer, to test that this is handled correctly
1651        let a_data = ArrayDataBuilder::new(type_a)
1652            .len(6)
1653            .add_child_data(b.into_data())
1654            .build()
1655            .unwrap();
1656        let a = StructArray::from(a_data);
1657
1658        assert_eq!(a.null_count(), 0);
1659        assert_eq!(a.column(0).null_count(), 2);
1660
1661        // build a racord batch
1662        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1663
1664        roundtrip(batch, Some(SMALL_SIZE / 2));
1665    }
1666
1667    #[test]
1668    fn arrow_writer_2_level_struct_mixed_null_2() {
1669        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
1670        let field_c = Field::new("c", DataType::Int32, false);
1671        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1672        let field_e = Field::new(
1673            "e",
1674            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1675            false,
1676        );
1677
1678        let field_b = Field::new(
1679            "b",
1680            DataType::Struct(vec![field_c, field_d, field_e].into()),
1681            false,
1682        );
1683        let type_a = DataType::Struct(vec![field_b.clone()].into());
1684        let field_a = Field::new("a", type_a, true);
1685        let schema = Schema::new(vec![field_a.clone()]);
1686
1687        // create data
1688        let c = Int32Array::from_iter_values(0..6);
1689        let d = FixedSizeBinaryArray::try_from_iter(
1690            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1691        )
1692        .expect("four byte values");
1693        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1694        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1695            .len(6)
1696            .add_child_data(c.into_data())
1697            .add_child_data(d.into_data())
1698            .add_child_data(e.into_data())
1699            .build()
1700            .unwrap();
1701        let b = StructArray::from(b_data);
1702        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1703            .len(6)
1704            .null_bit_buffer(Some(Buffer::from([0b00100101])))
1705            .add_child_data(b.into_data())
1706            .build()
1707            .unwrap();
1708        let a = StructArray::from(a_data);
1709
1710        assert_eq!(a.null_count(), 3);
1711        assert_eq!(a.column(0).null_count(), 0);
1712
1713        // build a record batch
1714        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1715
1716        roundtrip(batch, Some(SMALL_SIZE / 2));
1717    }
1718
1719    #[test]
1720    fn test_empty_dict() {
1721        let struct_fields = Fields::from(vec![Field::new(
1722            "dict",
1723            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1724            false,
1725        )]);
1726
1727        let schema = Schema::new(vec![Field::new_struct(
1728            "struct",
1729            struct_fields.clone(),
1730            true,
1731        )]);
1732        let dictionary = Arc::new(DictionaryArray::new(
1733            Int32Array::new_null(5),
1734            Arc::new(StringArray::new_null(0)),
1735        ));
1736
1737        let s = StructArray::new(
1738            struct_fields,
1739            vec![dictionary],
1740            Some(NullBuffer::new_null(5)),
1741        );
1742
1743        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1744        roundtrip(batch, None);
1745    }
1746    #[test]
1747    fn arrow_writer_page_size() {
1748        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1749
1750        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1751
1752        // Generate an array of 10 unique 10 character string
1753        for i in 0..10 {
1754            let value = i
1755                .to_string()
1756                .repeat(10)
1757                .chars()
1758                .take(10)
1759                .collect::<String>();
1760
1761            builder.append_value(value);
1762        }
1763
1764        let array = Arc::new(builder.finish());
1765
1766        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1767
1768        let file = tempfile::tempfile().unwrap();
1769
1770        // Set everything very low so we fallback to PLAIN encoding after the first row
1771        let props = WriterProperties::builder()
1772            .set_data_page_size_limit(1)
1773            .set_dictionary_page_size_limit(1)
1774            .set_write_batch_size(1)
1775            .build();
1776
1777        let mut writer =
1778            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
1779                .expect("Unable to write file");
1780        writer.write(&batch).unwrap();
1781        writer.close().unwrap();
1782
1783        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
1784
1785        let column = reader.metadata().row_group(0).columns();
1786
1787        assert_eq!(column.len(), 1);
1788
1789        // We should write one row before falling back to PLAIN encoding so there should still be a
1790        // dictionary page.
1791        assert!(
1792            column[0].dictionary_page_offset().is_some(),
1793            "Expected a dictionary page"
1794        );
1795
1796        let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
1797
1798        let page_locations = offset_indexes[0].page_locations.clone();
1799
1800        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
1801        // so we expect one dictionary encoded page and then a page per row thereafter.
1802        assert_eq!(
1803            page_locations.len(),
1804            10,
1805            "Expected 9 pages but got {page_locations:#?}"
1806        );
1807    }
1808
1809    #[test]
1810    fn arrow_writer_float_nans() {
1811        let f16_field = Field::new("a", DataType::Float16, false);
1812        let f32_field = Field::new("b", DataType::Float32, false);
1813        let f64_field = Field::new("c", DataType::Float64, false);
1814        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
1815
1816        let f16_values = (0..MEDIUM_SIZE)
1817            .map(|i| {
1818                Some(if i % 2 == 0 {
1819                    f16::NAN
1820                } else {
1821                    f16::from_f32(i as f32)
1822                })
1823            })
1824            .collect::<Float16Array>();
1825
1826        let f32_values = (0..MEDIUM_SIZE)
1827            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
1828            .collect::<Float32Array>();
1829
1830        let f64_values = (0..MEDIUM_SIZE)
1831            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
1832            .collect::<Float64Array>();
1833
1834        let batch = RecordBatch::try_new(
1835            Arc::new(schema),
1836            vec![
1837                Arc::new(f16_values),
1838                Arc::new(f32_values),
1839                Arc::new(f64_values),
1840            ],
1841        )
1842        .unwrap();
1843
1844        roundtrip(batch, None);
1845    }
1846
1847    const SMALL_SIZE: usize = 7;
1848    const MEDIUM_SIZE: usize = 63;
1849
1850    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
1851        let mut files = vec![];
1852        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1853            let mut props = WriterProperties::builder().set_writer_version(version);
1854
1855            if let Some(size) = max_row_group_size {
1856                props = props.set_max_row_group_size(size)
1857            }
1858
1859            let props = props.build();
1860            files.push(roundtrip_opts(&expected_batch, props))
1861        }
1862        files
1863    }
1864
1865    fn roundtrip_opts_with_array_validation<F>(
1866        expected_batch: &RecordBatch,
1867        props: WriterProperties,
1868        validate: F,
1869    ) -> File
1870    where
1871        F: Fn(&ArrayData, &ArrayData),
1872    {
1873        let file = tempfile::tempfile().unwrap();
1874
1875        let mut writer = ArrowWriter::try_new(
1876            file.try_clone().unwrap(),
1877            expected_batch.schema(),
1878            Some(props),
1879        )
1880        .expect("Unable to write file");
1881        writer.write(expected_batch).unwrap();
1882        writer.close().unwrap();
1883
1884        let mut record_batch_reader =
1885            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
1886
1887        let actual_batch = record_batch_reader
1888            .next()
1889            .expect("No batch found")
1890            .expect("Unable to get batch");
1891
1892        assert_eq!(expected_batch.schema(), actual_batch.schema());
1893        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1894        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1895        for i in 0..expected_batch.num_columns() {
1896            let expected_data = expected_batch.column(i).to_data();
1897            let actual_data = actual_batch.column(i).to_data();
1898            validate(&expected_data, &actual_data);
1899        }
1900
1901        file
1902    }
1903
1904    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
1905        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
1906            a.validate_full().expect("valid expected data");
1907            b.validate_full().expect("valid actual data");
1908            assert_eq!(a, b)
1909        })
1910    }
1911
1912    struct RoundTripOptions {
1913        values: ArrayRef,
1914        schema: SchemaRef,
1915        bloom_filter: bool,
1916        bloom_filter_position: BloomFilterPosition,
1917    }
1918
1919    impl RoundTripOptions {
1920        fn new(values: ArrayRef, nullable: bool) -> Self {
1921            let data_type = values.data_type().clone();
1922            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
1923            Self {
1924                values,
1925                schema: Arc::new(schema),
1926                bloom_filter: false,
1927                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
1928            }
1929        }
1930    }
1931
1932    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
1933        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
1934    }
1935
1936    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
1937        let mut options = RoundTripOptions::new(values, false);
1938        options.schema = schema;
1939        one_column_roundtrip_with_options(options)
1940    }
1941
1942    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
1943        let RoundTripOptions {
1944            values,
1945            schema,
1946            bloom_filter,
1947            bloom_filter_position,
1948        } = options;
1949
1950        let encodings = match values.data_type() {
1951            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
1952                vec![
1953                    Encoding::PLAIN,
1954                    Encoding::DELTA_BYTE_ARRAY,
1955                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
1956                ]
1957            }
1958            DataType::Int64
1959            | DataType::Int32
1960            | DataType::Int16
1961            | DataType::Int8
1962            | DataType::UInt64
1963            | DataType::UInt32
1964            | DataType::UInt16
1965            | DataType::UInt8 => vec![
1966                Encoding::PLAIN,
1967                Encoding::DELTA_BINARY_PACKED,
1968                Encoding::BYTE_STREAM_SPLIT,
1969            ],
1970            DataType::Float32 | DataType::Float64 => {
1971                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
1972            }
1973            _ => vec![Encoding::PLAIN],
1974        };
1975
1976        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
1977
1978        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
1979
1980        let mut files = vec![];
1981        for dictionary_size in [0, 1, 1024] {
1982            for encoding in &encodings {
1983                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1984                    for row_group_size in row_group_sizes {
1985                        let props = WriterProperties::builder()
1986                            .set_writer_version(version)
1987                            .set_max_row_group_size(row_group_size)
1988                            .set_dictionary_enabled(dictionary_size != 0)
1989                            .set_dictionary_page_size_limit(dictionary_size.max(1))
1990                            .set_encoding(*encoding)
1991                            .set_bloom_filter_enabled(bloom_filter)
1992                            .set_bloom_filter_position(bloom_filter_position)
1993                            .build();
1994
1995                        files.push(roundtrip_opts(&expected_batch, props))
1996                    }
1997                }
1998            }
1999        }
2000        files
2001    }
2002
2003    fn values_required<A, I>(iter: I) -> Vec<File>
2004    where
2005        A: From<Vec<I::Item>> + Array + 'static,
2006        I: IntoIterator,
2007    {
2008        let raw_values: Vec<_> = iter.into_iter().collect();
2009        let values = Arc::new(A::from(raw_values));
2010        one_column_roundtrip(values, false)
2011    }
2012
2013    fn values_optional<A, I>(iter: I) -> Vec<File>
2014    where
2015        A: From<Vec<Option<I::Item>>> + Array + 'static,
2016        I: IntoIterator,
2017    {
2018        let optional_raw_values: Vec<_> = iter
2019            .into_iter()
2020            .enumerate()
2021            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2022            .collect();
2023        let optional_values = Arc::new(A::from(optional_raw_values));
2024        one_column_roundtrip(optional_values, true)
2025    }
2026
2027    fn required_and_optional<A, I>(iter: I)
2028    where
2029        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2030        I: IntoIterator + Clone,
2031    {
2032        values_required::<A, I>(iter.clone());
2033        values_optional::<A, I>(iter);
2034    }
2035
2036    fn check_bloom_filter<T: AsBytes>(
2037        files: Vec<File>,
2038        file_column: String,
2039        positive_values: Vec<T>,
2040        negative_values: Vec<T>,
2041    ) {
2042        files.into_iter().take(1).for_each(|file| {
2043            let file_reader = SerializedFileReader::new_with_options(
2044                file,
2045                ReadOptionsBuilder::new()
2046                    .with_reader_properties(
2047                        ReaderProperties::builder()
2048                            .set_read_bloom_filter(true)
2049                            .build(),
2050                    )
2051                    .build(),
2052            )
2053            .expect("Unable to open file as Parquet");
2054            let metadata = file_reader.metadata();
2055
2056            // Gets bloom filters from all row groups.
2057            let mut bloom_filters: Vec<_> = vec![];
2058            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2059                if let Some((column_index, _)) = row_group
2060                    .columns()
2061                    .iter()
2062                    .enumerate()
2063                    .find(|(_, column)| column.column_path().string() == file_column)
2064                {
2065                    let row_group_reader = file_reader
2066                        .get_row_group(ri)
2067                        .expect("Unable to read row group");
2068                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2069                        bloom_filters.push(sbbf.clone());
2070                    } else {
2071                        panic!("No bloom filter for column named {file_column} found");
2072                    }
2073                } else {
2074                    panic!("No column named {file_column} found");
2075                }
2076            }
2077
2078            positive_values.iter().for_each(|value| {
2079                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2080                assert!(
2081                    found.is_some(),
2082                    "{}",
2083                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2084                );
2085            });
2086
2087            negative_values.iter().for_each(|value| {
2088                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2089                assert!(
2090                    found.is_none(),
2091                    "{}",
2092                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2093                );
2094            });
2095        });
2096    }
2097
2098    #[test]
2099    fn all_null_primitive_single_column() {
2100        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2101        one_column_roundtrip(values, true);
2102    }
2103    #[test]
2104    fn null_single_column() {
2105        let values = Arc::new(NullArray::new(SMALL_SIZE));
2106        one_column_roundtrip(values, true);
2107        // null arrays are always nullable, a test with non-nullable nulls fails
2108    }
2109
2110    #[test]
2111    fn bool_single_column() {
2112        required_and_optional::<BooleanArray, _>(
2113            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2114        );
2115    }
2116
2117    #[test]
2118    fn bool_large_single_column() {
2119        let values = Arc::new(
2120            [None, Some(true), Some(false)]
2121                .iter()
2122                .cycle()
2123                .copied()
2124                .take(200_000)
2125                .collect::<BooleanArray>(),
2126        );
2127        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2128        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2129        let file = tempfile::tempfile().unwrap();
2130
2131        let mut writer =
2132            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2133                .expect("Unable to write file");
2134        writer.write(&expected_batch).unwrap();
2135        writer.close().unwrap();
2136    }
2137
2138    #[test]
2139    fn check_page_offset_index_with_nan() {
2140        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2141        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2142        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2143
2144        let mut out = Vec::with_capacity(1024);
2145        let mut writer =
2146            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2147        writer.write(&batch).unwrap();
2148        let file_meta_data = writer.close().unwrap();
2149        for row_group in file_meta_data.row_groups {
2150            for column in row_group.columns {
2151                assert!(column.offset_index_offset.is_some());
2152                assert!(column.offset_index_length.is_some());
2153                assert!(column.column_index_offset.is_none());
2154                assert!(column.column_index_length.is_none());
2155            }
2156        }
2157    }
2158
2159    #[test]
2160    fn i8_single_column() {
2161        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2162    }
2163
2164    #[test]
2165    fn i16_single_column() {
2166        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2167    }
2168
2169    #[test]
2170    fn i32_single_column() {
2171        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2172    }
2173
2174    #[test]
2175    fn i64_single_column() {
2176        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2177    }
2178
2179    #[test]
2180    fn u8_single_column() {
2181        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2182    }
2183
2184    #[test]
2185    fn u16_single_column() {
2186        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2187    }
2188
2189    #[test]
2190    fn u32_single_column() {
2191        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2192    }
2193
2194    #[test]
2195    fn u64_single_column() {
2196        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2197    }
2198
2199    #[test]
2200    fn f32_single_column() {
2201        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2202    }
2203
2204    #[test]
2205    fn f64_single_column() {
2206        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2207    }
2208
2209    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2210    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2211    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2212
2213    #[test]
2214    fn timestamp_second_single_column() {
2215        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2216        let values = Arc::new(TimestampSecondArray::from(raw_values));
2217
2218        one_column_roundtrip(values, false);
2219    }
2220
2221    #[test]
2222    fn timestamp_millisecond_single_column() {
2223        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2224        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2225
2226        one_column_roundtrip(values, false);
2227    }
2228
2229    #[test]
2230    fn timestamp_microsecond_single_column() {
2231        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2232        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2233
2234        one_column_roundtrip(values, false);
2235    }
2236
2237    #[test]
2238    fn timestamp_nanosecond_single_column() {
2239        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2240        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2241
2242        one_column_roundtrip(values, false);
2243    }
2244
2245    #[test]
2246    fn date32_single_column() {
2247        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2248    }
2249
2250    #[test]
2251    fn date64_single_column() {
2252        // Date64 must be a multiple of 86400000, see ARROW-10925
2253        required_and_optional::<Date64Array, _>(
2254            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2255        );
2256    }
2257
2258    #[test]
2259    fn time32_second_single_column() {
2260        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2261    }
2262
2263    #[test]
2264    fn time32_millisecond_single_column() {
2265        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2266    }
2267
2268    #[test]
2269    fn time64_microsecond_single_column() {
2270        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2271    }
2272
2273    #[test]
2274    fn time64_nanosecond_single_column() {
2275        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2276    }
2277
2278    #[test]
2279    #[should_panic(expected = "Converting Duration to parquet not supported")]
2280    fn duration_second_single_column() {
2281        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2282    }
2283
2284    #[test]
2285    #[should_panic(expected = "Converting Duration to parquet not supported")]
2286    fn duration_millisecond_single_column() {
2287        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2288    }
2289
2290    #[test]
2291    #[should_panic(expected = "Converting Duration to parquet not supported")]
2292    fn duration_microsecond_single_column() {
2293        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2294    }
2295
2296    #[test]
2297    #[should_panic(expected = "Converting Duration to parquet not supported")]
2298    fn duration_nanosecond_single_column() {
2299        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2300    }
2301
2302    #[test]
2303    fn interval_year_month_single_column() {
2304        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2305    }
2306
2307    #[test]
2308    fn interval_day_time_single_column() {
2309        required_and_optional::<IntervalDayTimeArray, _>(vec![
2310            IntervalDayTime::new(0, 1),
2311            IntervalDayTime::new(0, 3),
2312            IntervalDayTime::new(3, -2),
2313            IntervalDayTime::new(-200, 4),
2314        ]);
2315    }
2316
2317    #[test]
2318    #[should_panic(
2319        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2320    )]
2321    fn interval_month_day_nano_single_column() {
2322        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2323            IntervalMonthDayNano::new(0, 1, 5),
2324            IntervalMonthDayNano::new(0, 3, 2),
2325            IntervalMonthDayNano::new(3, -2, -5),
2326            IntervalMonthDayNano::new(-200, 4, -1),
2327        ]);
2328    }
2329
2330    #[test]
2331    fn binary_single_column() {
2332        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2333        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2334        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2335
2336        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2337        values_required::<BinaryArray, _>(many_vecs_iter);
2338    }
2339
2340    #[test]
2341    fn binary_view_single_column() {
2342        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2343        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2344        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2345
2346        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2347        values_required::<BinaryViewArray, _>(many_vecs_iter);
2348    }
2349
2350    #[test]
2351    fn i32_column_bloom_filter_at_end() {
2352        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2353        let mut options = RoundTripOptions::new(array, false);
2354        options.bloom_filter = true;
2355        options.bloom_filter_position = BloomFilterPosition::End;
2356
2357        let files = one_column_roundtrip_with_options(options);
2358        check_bloom_filter(
2359            files,
2360            "col".to_string(),
2361            (0..SMALL_SIZE as i32).collect(),
2362            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2363        );
2364    }
2365
2366    #[test]
2367    fn i32_column_bloom_filter() {
2368        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2369        let mut options = RoundTripOptions::new(array, false);
2370        options.bloom_filter = true;
2371
2372        let files = one_column_roundtrip_with_options(options);
2373        check_bloom_filter(
2374            files,
2375            "col".to_string(),
2376            (0..SMALL_SIZE as i32).collect(),
2377            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2378        );
2379    }
2380
2381    #[test]
2382    fn binary_column_bloom_filter() {
2383        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2384        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2385        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2386
2387        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2388        let mut options = RoundTripOptions::new(array, false);
2389        options.bloom_filter = true;
2390
2391        let files = one_column_roundtrip_with_options(options);
2392        check_bloom_filter(
2393            files,
2394            "col".to_string(),
2395            many_vecs,
2396            vec![vec![(SMALL_SIZE + 1) as u8]],
2397        );
2398    }
2399
2400    #[test]
2401    fn empty_string_null_column_bloom_filter() {
2402        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2403        let raw_strs = raw_values.iter().map(|s| s.as_str());
2404
2405        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2406        let mut options = RoundTripOptions::new(array, false);
2407        options.bloom_filter = true;
2408
2409        let files = one_column_roundtrip_with_options(options);
2410
2411        let optional_raw_values: Vec<_> = raw_values
2412            .iter()
2413            .enumerate()
2414            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2415            .collect();
2416        // For null slots, empty string should not be in bloom filter.
2417        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2418    }
2419
2420    #[test]
2421    fn large_binary_single_column() {
2422        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2423        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2424        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2425
2426        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2427        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2428    }
2429
2430    #[test]
2431    fn fixed_size_binary_single_column() {
2432        let mut builder = FixedSizeBinaryBuilder::new(4);
2433        builder.append_value(b"0123").unwrap();
2434        builder.append_null();
2435        builder.append_value(b"8910").unwrap();
2436        builder.append_value(b"1112").unwrap();
2437        let array = Arc::new(builder.finish());
2438
2439        one_column_roundtrip(array, true);
2440    }
2441
2442    #[test]
2443    fn string_single_column() {
2444        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2445        let raw_strs = raw_values.iter().map(|s| s.as_str());
2446
2447        required_and_optional::<StringArray, _>(raw_strs);
2448    }
2449
2450    #[test]
2451    fn large_string_single_column() {
2452        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2453        let raw_strs = raw_values.iter().map(|s| s.as_str());
2454
2455        required_and_optional::<LargeStringArray, _>(raw_strs);
2456    }
2457
2458    #[test]
2459    fn string_view_single_column() {
2460        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2461        let raw_strs = raw_values.iter().map(|s| s.as_str());
2462
2463        required_and_optional::<StringViewArray, _>(raw_strs);
2464    }
2465
2466    #[test]
2467    fn null_list_single_column() {
2468        let null_field = Field::new_list_field(DataType::Null, true);
2469        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2470
2471        let schema = Schema::new(vec![list_field]);
2472
2473        // Build [[], null, [null, null]]
2474        let a_values = NullArray::new(2);
2475        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2476        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2477            DataType::Null,
2478            true,
2479        ))))
2480        .len(3)
2481        .add_buffer(a_value_offsets)
2482        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2483        .add_child_data(a_values.into_data())
2484        .build()
2485        .unwrap();
2486
2487        let a = ListArray::from(a_list_data);
2488
2489        assert!(a.is_valid(0));
2490        assert!(!a.is_valid(1));
2491        assert!(a.is_valid(2));
2492
2493        assert_eq!(a.value(0).len(), 0);
2494        assert_eq!(a.value(2).len(), 2);
2495        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2496
2497        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2498        roundtrip(batch, None);
2499    }
2500
2501    #[test]
2502    fn list_single_column() {
2503        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2504        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2505        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2506            DataType::Int32,
2507            false,
2508        ))))
2509        .len(5)
2510        .add_buffer(a_value_offsets)
2511        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2512        .add_child_data(a_values.into_data())
2513        .build()
2514        .unwrap();
2515
2516        assert_eq!(a_list_data.null_count(), 1);
2517
2518        let a = ListArray::from(a_list_data);
2519        let values = Arc::new(a);
2520
2521        one_column_roundtrip(values, true);
2522    }
2523
2524    #[test]
2525    fn large_list_single_column() {
2526        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2527        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2528        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2529            "large_item",
2530            DataType::Int32,
2531            true,
2532        ))))
2533        .len(5)
2534        .add_buffer(a_value_offsets)
2535        .add_child_data(a_values.into_data())
2536        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2537        .build()
2538        .unwrap();
2539
2540        // I think this setup is incorrect because this should pass
2541        assert_eq!(a_list_data.null_count(), 1);
2542
2543        let a = LargeListArray::from(a_list_data);
2544        let values = Arc::new(a);
2545
2546        one_column_roundtrip(values, true);
2547    }
2548
2549    #[test]
2550    fn list_nested_nulls() {
2551        use arrow::datatypes::Int32Type;
2552        let data = vec![
2553            Some(vec![Some(1)]),
2554            Some(vec![Some(2), Some(3)]),
2555            None,
2556            Some(vec![Some(4), Some(5), None]),
2557            Some(vec![None]),
2558            Some(vec![Some(6), Some(7)]),
2559        ];
2560
2561        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2562        one_column_roundtrip(Arc::new(list), true);
2563
2564        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2565        one_column_roundtrip(Arc::new(list), true);
2566    }
2567
2568    #[test]
2569    fn struct_single_column() {
2570        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2571        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2572        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2573
2574        let values = Arc::new(s);
2575        one_column_roundtrip(values, false);
2576    }
2577
2578    #[test]
2579    fn list_and_map_coerced_names() {
2580        // Create map and list with non-Parquet naming
2581        let list_field =
2582            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2583        let map_field = Field::new_map(
2584            "my_map",
2585            "entries",
2586            Field::new("keys", DataType::Int32, false),
2587            Field::new("values", DataType::Int32, true),
2588            false,
2589            true,
2590        );
2591
2592        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2593        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2594
2595        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2596
2597        // Write data to Parquet but coerce names to match spec
2598        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2599        let file = tempfile::tempfile().unwrap();
2600        let mut writer =
2601            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2602
2603        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2604        writer.write(&batch).unwrap();
2605        let file_metadata = writer.close().unwrap();
2606
2607        // Coerced name of "item" should be "element"
2608        assert_eq!(file_metadata.schema[3].name, "element");
2609        // Coerced name of "entries" should be "key_value"
2610        assert_eq!(file_metadata.schema[5].name, "key_value");
2611        // Coerced name of "keys" should be "key"
2612        assert_eq!(file_metadata.schema[6].name, "key");
2613        // Coerced name of "values" should be "value"
2614        assert_eq!(file_metadata.schema[7].name, "value");
2615
2616        // Double check schema after reading from the file
2617        let reader = SerializedFileReader::new(file).unwrap();
2618        let file_schema = reader.metadata().file_metadata().schema();
2619        let fields = file_schema.get_fields();
2620        let list_field = &fields[0].get_fields()[0];
2621        assert_eq!(list_field.get_fields()[0].name(), "element");
2622        let map_field = &fields[1].get_fields()[0];
2623        assert_eq!(map_field.name(), "key_value");
2624        assert_eq!(map_field.get_fields()[0].name(), "key");
2625        assert_eq!(map_field.get_fields()[1].name(), "value");
2626    }
2627
2628    #[test]
2629    fn fallback_flush_data_page() {
2630        //tests if the Fallback::flush_data_page clears all buffers correctly
2631        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2632        let values = Arc::new(StringArray::from(raw_values));
2633        let encodings = vec![
2634            Encoding::DELTA_BYTE_ARRAY,
2635            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2636        ];
2637        let data_type = values.data_type().clone();
2638        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2639        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2640
2641        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2642        let data_page_size_limit: usize = 32;
2643        let write_batch_size: usize = 16;
2644
2645        for encoding in &encodings {
2646            for row_group_size in row_group_sizes {
2647                let props = WriterProperties::builder()
2648                    .set_writer_version(WriterVersion::PARQUET_2_0)
2649                    .set_max_row_group_size(row_group_size)
2650                    .set_dictionary_enabled(false)
2651                    .set_encoding(*encoding)
2652                    .set_data_page_size_limit(data_page_size_limit)
2653                    .set_write_batch_size(write_batch_size)
2654                    .build();
2655
2656                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2657                    let string_array_a = StringArray::from(a.clone());
2658                    let string_array_b = StringArray::from(b.clone());
2659                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2660                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2661                    assert_eq!(
2662                        vec_a, vec_b,
2663                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2664                    );
2665                });
2666            }
2667        }
2668    }
2669
2670    #[test]
2671    fn arrow_writer_string_dictionary() {
2672        // define schema
2673        #[allow(deprecated)]
2674        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2675            "dictionary",
2676            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2677            true,
2678            42,
2679            true,
2680        )]));
2681
2682        // create some data
2683        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2684            .iter()
2685            .copied()
2686            .collect();
2687
2688        // build a record batch
2689        one_column_roundtrip_with_schema(Arc::new(d), schema);
2690    }
2691
2692    #[test]
2693    fn arrow_writer_primitive_dictionary() {
2694        // define schema
2695        #[allow(deprecated)]
2696        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2697            "dictionary",
2698            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2699            true,
2700            42,
2701            true,
2702        )]));
2703
2704        // create some data
2705        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2706        builder.append(12345678).unwrap();
2707        builder.append_null();
2708        builder.append(22345678).unwrap();
2709        builder.append(12345678).unwrap();
2710        let d = builder.finish();
2711
2712        one_column_roundtrip_with_schema(Arc::new(d), schema);
2713    }
2714
2715    #[test]
2716    fn arrow_writer_decimal128_dictionary() {
2717        let integers = vec![12345, 56789, 34567];
2718
2719        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2720
2721        let values = Decimal128Array::from(integers.clone())
2722            .with_precision_and_scale(5, 2)
2723            .unwrap();
2724
2725        let array = DictionaryArray::new(keys, Arc::new(values));
2726        one_column_roundtrip(Arc::new(array.clone()), true);
2727
2728        let values = Decimal128Array::from(integers)
2729            .with_precision_and_scale(12, 2)
2730            .unwrap();
2731
2732        let array = array.with_values(Arc::new(values));
2733        one_column_roundtrip(Arc::new(array), true);
2734    }
2735
2736    #[test]
2737    fn arrow_writer_decimal256_dictionary() {
2738        let integers = vec![
2739            i256::from_i128(12345),
2740            i256::from_i128(56789),
2741            i256::from_i128(34567),
2742        ];
2743
2744        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2745
2746        let values = Decimal256Array::from(integers.clone())
2747            .with_precision_and_scale(5, 2)
2748            .unwrap();
2749
2750        let array = DictionaryArray::new(keys, Arc::new(values));
2751        one_column_roundtrip(Arc::new(array.clone()), true);
2752
2753        let values = Decimal256Array::from(integers)
2754            .with_precision_and_scale(12, 2)
2755            .unwrap();
2756
2757        let array = array.with_values(Arc::new(values));
2758        one_column_roundtrip(Arc::new(array), true);
2759    }
2760
2761    #[test]
2762    fn arrow_writer_string_dictionary_unsigned_index() {
2763        // define schema
2764        #[allow(deprecated)]
2765        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2766            "dictionary",
2767            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
2768            true,
2769            42,
2770            true,
2771        )]));
2772
2773        // create some data
2774        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2775            .iter()
2776            .copied()
2777            .collect();
2778
2779        one_column_roundtrip_with_schema(Arc::new(d), schema);
2780    }
2781
2782    #[test]
2783    fn u32_min_max() {
2784        // check values roundtrip through parquet
2785        let src = [
2786            u32::MIN,
2787            u32::MIN + 1,
2788            (i32::MAX as u32) - 1,
2789            i32::MAX as u32,
2790            (i32::MAX as u32) + 1,
2791            u32::MAX - 1,
2792            u32::MAX,
2793        ];
2794        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
2795        let files = one_column_roundtrip(values, false);
2796
2797        for file in files {
2798            // check statistics are valid
2799            let reader = SerializedFileReader::new(file).unwrap();
2800            let metadata = reader.metadata();
2801
2802            let mut row_offset = 0;
2803            for row_group in metadata.row_groups() {
2804                assert_eq!(row_group.num_columns(), 1);
2805                let column = row_group.column(0);
2806
2807                let num_values = column.num_values() as usize;
2808                let src_slice = &src[row_offset..row_offset + num_values];
2809                row_offset += column.num_values() as usize;
2810
2811                let stats = column.statistics().unwrap();
2812                if let Statistics::Int32(stats) = stats {
2813                    assert_eq!(
2814                        *stats.min_opt().unwrap() as u32,
2815                        *src_slice.iter().min().unwrap()
2816                    );
2817                    assert_eq!(
2818                        *stats.max_opt().unwrap() as u32,
2819                        *src_slice.iter().max().unwrap()
2820                    );
2821                } else {
2822                    panic!("Statistics::Int32 missing")
2823                }
2824            }
2825        }
2826    }
2827
2828    #[test]
2829    fn u64_min_max() {
2830        // check values roundtrip through parquet
2831        let src = [
2832            u64::MIN,
2833            u64::MIN + 1,
2834            (i64::MAX as u64) - 1,
2835            i64::MAX as u64,
2836            (i64::MAX as u64) + 1,
2837            u64::MAX - 1,
2838            u64::MAX,
2839        ];
2840        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
2841        let files = one_column_roundtrip(values, false);
2842
2843        for file in files {
2844            // check statistics are valid
2845            let reader = SerializedFileReader::new(file).unwrap();
2846            let metadata = reader.metadata();
2847
2848            let mut row_offset = 0;
2849            for row_group in metadata.row_groups() {
2850                assert_eq!(row_group.num_columns(), 1);
2851                let column = row_group.column(0);
2852
2853                let num_values = column.num_values() as usize;
2854                let src_slice = &src[row_offset..row_offset + num_values];
2855                row_offset += column.num_values() as usize;
2856
2857                let stats = column.statistics().unwrap();
2858                if let Statistics::Int64(stats) = stats {
2859                    assert_eq!(
2860                        *stats.min_opt().unwrap() as u64,
2861                        *src_slice.iter().min().unwrap()
2862                    );
2863                    assert_eq!(
2864                        *stats.max_opt().unwrap() as u64,
2865                        *src_slice.iter().max().unwrap()
2866                    );
2867                } else {
2868                    panic!("Statistics::Int64 missing")
2869                }
2870            }
2871        }
2872    }
2873
2874    #[test]
2875    fn statistics_null_counts_only_nulls() {
2876        // check that null-count statistics for "only NULL"-columns are correct
2877        let values = Arc::new(UInt64Array::from(vec![None, None]));
2878        let files = one_column_roundtrip(values, true);
2879
2880        for file in files {
2881            // check statistics are valid
2882            let reader = SerializedFileReader::new(file).unwrap();
2883            let metadata = reader.metadata();
2884            assert_eq!(metadata.num_row_groups(), 1);
2885            let row_group = metadata.row_group(0);
2886            assert_eq!(row_group.num_columns(), 1);
2887            let column = row_group.column(0);
2888            let stats = column.statistics().unwrap();
2889            assert_eq!(stats.null_count_opt(), Some(2));
2890        }
2891    }
2892
2893    #[test]
2894    fn test_list_of_struct_roundtrip() {
2895        // define schema
2896        let int_field = Field::new("a", DataType::Int32, true);
2897        let int_field2 = Field::new("b", DataType::Int32, true);
2898
2899        let int_builder = Int32Builder::with_capacity(10);
2900        let int_builder2 = Int32Builder::with_capacity(10);
2901
2902        let struct_builder = StructBuilder::new(
2903            vec![int_field, int_field2],
2904            vec![Box::new(int_builder), Box::new(int_builder2)],
2905        );
2906        let mut list_builder = ListBuilder::new(struct_builder);
2907
2908        // Construct the following array
2909        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
2910
2911        // [{a: 1, b: 2}]
2912        let values = list_builder.values();
2913        values
2914            .field_builder::<Int32Builder>(0)
2915            .unwrap()
2916            .append_value(1);
2917        values
2918            .field_builder::<Int32Builder>(1)
2919            .unwrap()
2920            .append_value(2);
2921        values.append(true);
2922        list_builder.append(true);
2923
2924        // []
2925        list_builder.append(true);
2926
2927        // null
2928        list_builder.append(false);
2929
2930        // [null, null]
2931        let values = list_builder.values();
2932        values
2933            .field_builder::<Int32Builder>(0)
2934            .unwrap()
2935            .append_null();
2936        values
2937            .field_builder::<Int32Builder>(1)
2938            .unwrap()
2939            .append_null();
2940        values.append(false);
2941        values
2942            .field_builder::<Int32Builder>(0)
2943            .unwrap()
2944            .append_null();
2945        values
2946            .field_builder::<Int32Builder>(1)
2947            .unwrap()
2948            .append_null();
2949        values.append(false);
2950        list_builder.append(true);
2951
2952        // [{a: null, b: 3}]
2953        let values = list_builder.values();
2954        values
2955            .field_builder::<Int32Builder>(0)
2956            .unwrap()
2957            .append_null();
2958        values
2959            .field_builder::<Int32Builder>(1)
2960            .unwrap()
2961            .append_value(3);
2962        values.append(true);
2963        list_builder.append(true);
2964
2965        // [{a: 2, b: null}]
2966        let values = list_builder.values();
2967        values
2968            .field_builder::<Int32Builder>(0)
2969            .unwrap()
2970            .append_value(2);
2971        values
2972            .field_builder::<Int32Builder>(1)
2973            .unwrap()
2974            .append_null();
2975        values.append(true);
2976        list_builder.append(true);
2977
2978        let array = Arc::new(list_builder.finish());
2979
2980        one_column_roundtrip(array, true);
2981    }
2982
2983    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
2984        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
2985    }
2986
2987    #[test]
2988    fn test_aggregates_records() {
2989        let arrays = [
2990            Int32Array::from((0..100).collect::<Vec<_>>()),
2991            Int32Array::from((0..50).collect::<Vec<_>>()),
2992            Int32Array::from((200..500).collect::<Vec<_>>()),
2993        ];
2994
2995        let schema = Arc::new(Schema::new(vec![Field::new(
2996            "int",
2997            ArrowDataType::Int32,
2998            false,
2999        )]));
3000
3001        let file = tempfile::tempfile().unwrap();
3002
3003        let props = WriterProperties::builder()
3004            .set_max_row_group_size(200)
3005            .build();
3006
3007        let mut writer =
3008            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3009
3010        for array in arrays {
3011            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3012            writer.write(&batch).unwrap();
3013        }
3014
3015        writer.close().unwrap();
3016
3017        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3018        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3019
3020        let batches = builder
3021            .with_batch_size(100)
3022            .build()
3023            .unwrap()
3024            .collect::<ArrowResult<Vec<_>>>()
3025            .unwrap();
3026
3027        assert_eq!(batches.len(), 5);
3028        assert!(batches.iter().all(|x| x.num_columns() == 1));
3029
3030        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3031
3032        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3033
3034        let values: Vec<_> = batches
3035            .iter()
3036            .flat_map(|x| {
3037                x.column(0)
3038                    .as_any()
3039                    .downcast_ref::<Int32Array>()
3040                    .unwrap()
3041                    .values()
3042                    .iter()
3043                    .cloned()
3044            })
3045            .collect();
3046
3047        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3048        assert_eq!(&values, &expected_values)
3049    }
3050
3051    #[test]
3052    fn complex_aggregate() {
3053        // Tests aggregating nested data
3054        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3055        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3056        let struct_a = Arc::new(Field::new(
3057            "struct_a",
3058            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3059            true,
3060        ));
3061
3062        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3063        let struct_b = Arc::new(Field::new(
3064            "struct_b",
3065            DataType::Struct(vec![list_a.clone()].into()),
3066            false,
3067        ));
3068
3069        let schema = Arc::new(Schema::new(vec![struct_b]));
3070
3071        // create nested data
3072        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3073        let field_b_array =
3074            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3075
3076        let struct_a_array = StructArray::from(vec![
3077            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3078            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3079        ]);
3080
3081        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3082            .len(5)
3083            .add_buffer(Buffer::from_iter(vec![
3084                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3085            ]))
3086            .null_bit_buffer(Some(Buffer::from_iter(vec![
3087                true, false, true, false, true,
3088            ])))
3089            .child_data(vec![struct_a_array.into_data()])
3090            .build()
3091            .unwrap();
3092
3093        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3094        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3095
3096        let batch1 =
3097            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3098                .unwrap();
3099
3100        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3101        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3102
3103        let struct_a_array = StructArray::from(vec![
3104            (field_a, Arc::new(field_a_array) as ArrayRef),
3105            (field_b, Arc::new(field_b_array) as ArrayRef),
3106        ]);
3107
3108        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3109            .len(2)
3110            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3111            .child_data(vec![struct_a_array.into_data()])
3112            .build()
3113            .unwrap();
3114
3115        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3116        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3117
3118        let batch2 =
3119            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3120                .unwrap();
3121
3122        let batches = &[batch1, batch2];
3123
3124        // Verify data is as expected
3125
3126        let expected = r#"
3127            +-------------------------------------------------------------------------------------------------------+
3128            | struct_b                                                                                              |
3129            +-------------------------------------------------------------------------------------------------------+
3130            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3131            | {list: }                                                                                              |
3132            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3133            | {list: }                                                                                              |
3134            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3135            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3136            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3137            +-------------------------------------------------------------------------------------------------------+
3138        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3139
3140        let actual = pretty_format_batches(batches).unwrap().to_string();
3141        assert_eq!(actual, expected);
3142
3143        // Write data
3144        let file = tempfile::tempfile().unwrap();
3145        let props = WriterProperties::builder()
3146            .set_max_row_group_size(6)
3147            .build();
3148
3149        let mut writer =
3150            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3151
3152        for batch in batches {
3153            writer.write(batch).unwrap();
3154        }
3155        writer.close().unwrap();
3156
3157        // Read Data
3158        // Should have written entire first batch and first row of second to the first row group
3159        // leaving a single row in the second row group
3160
3161        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3162        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3163
3164        let batches = builder
3165            .with_batch_size(2)
3166            .build()
3167            .unwrap()
3168            .collect::<ArrowResult<Vec<_>>>()
3169            .unwrap();
3170
3171        assert_eq!(batches.len(), 4);
3172        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3173        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3174
3175        let actual = pretty_format_batches(&batches).unwrap().to_string();
3176        assert_eq!(actual, expected);
3177    }
3178
3179    #[test]
3180    fn test_arrow_writer_metadata() {
3181        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3182        let file_schema = batch_schema.clone().with_metadata(
3183            vec![("foo".to_string(), "bar".to_string())]
3184                .into_iter()
3185                .collect(),
3186        );
3187
3188        let batch = RecordBatch::try_new(
3189            Arc::new(batch_schema),
3190            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3191        )
3192        .unwrap();
3193
3194        let mut buf = Vec::with_capacity(1024);
3195        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3196        writer.write(&batch).unwrap();
3197        writer.close().unwrap();
3198    }
3199
3200    #[test]
3201    fn test_arrow_writer_nullable() {
3202        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3203        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3204        let file_schema = Arc::new(file_schema);
3205
3206        let batch = RecordBatch::try_new(
3207            Arc::new(batch_schema),
3208            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3209        )
3210        .unwrap();
3211
3212        let mut buf = Vec::with_capacity(1024);
3213        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3214        writer.write(&batch).unwrap();
3215        writer.close().unwrap();
3216
3217        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3218        let back = read.next().unwrap().unwrap();
3219        assert_eq!(back.schema(), file_schema);
3220        assert_ne!(back.schema(), batch.schema());
3221        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3222    }
3223
3224    #[test]
3225    fn in_progress_accounting() {
3226        // define schema
3227        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3228
3229        // create some data
3230        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3231
3232        // build a record batch
3233        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3234
3235        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3236
3237        // starts empty
3238        assert_eq!(writer.in_progress_size(), 0);
3239        assert_eq!(writer.in_progress_rows(), 0);
3240        assert_eq!(writer.memory_size(), 0);
3241        assert_eq!(writer.bytes_written(), 4); // Initial header
3242        writer.write(&batch).unwrap();
3243
3244        // updated on write
3245        let initial_size = writer.in_progress_size();
3246        assert!(initial_size > 0);
3247        assert_eq!(writer.in_progress_rows(), 5);
3248        let initial_memory = writer.memory_size();
3249        assert!(initial_memory > 0);
3250        // memory estimate is larger than estimated encoded size
3251        assert!(
3252            initial_size <= initial_memory,
3253            "{initial_size} <= {initial_memory}"
3254        );
3255
3256        // updated on second write
3257        writer.write(&batch).unwrap();
3258        assert!(writer.in_progress_size() > initial_size);
3259        assert_eq!(writer.in_progress_rows(), 10);
3260        assert!(writer.memory_size() > initial_memory);
3261        assert!(
3262            writer.in_progress_size() <= writer.memory_size(),
3263            "in_progress_size {} <= memory_size {}",
3264            writer.in_progress_size(),
3265            writer.memory_size()
3266        );
3267
3268        // in progress tracking is cleared, but the overall data written is updated
3269        let pre_flush_bytes_written = writer.bytes_written();
3270        writer.flush().unwrap();
3271        assert_eq!(writer.in_progress_size(), 0);
3272        assert_eq!(writer.memory_size(), 0);
3273        assert!(writer.bytes_written() > pre_flush_bytes_written);
3274
3275        writer.close().unwrap();
3276    }
3277
3278    #[test]
3279    fn test_writer_all_null() {
3280        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3281        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3282        let batch = RecordBatch::try_from_iter(vec![
3283            ("a", Arc::new(a) as ArrayRef),
3284            ("b", Arc::new(b) as ArrayRef),
3285        ])
3286        .unwrap();
3287
3288        let mut buf = Vec::with_capacity(1024);
3289        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3290        writer.write(&batch).unwrap();
3291        writer.close().unwrap();
3292
3293        let bytes = Bytes::from(buf);
3294        let options = ReadOptionsBuilder::new().with_page_index().build();
3295        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3296        let index = reader.metadata().offset_index().unwrap();
3297
3298        assert_eq!(index.len(), 1);
3299        assert_eq!(index[0].len(), 2); // 2 columns
3300        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3301        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3302    }
3303
3304    #[test]
3305    fn test_disabled_statistics_with_page() {
3306        let file_schema = Schema::new(vec![
3307            Field::new("a", DataType::Utf8, true),
3308            Field::new("b", DataType::Utf8, true),
3309        ]);
3310        let file_schema = Arc::new(file_schema);
3311
3312        let batch = RecordBatch::try_new(
3313            file_schema.clone(),
3314            vec![
3315                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3316                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3317            ],
3318        )
3319        .unwrap();
3320
3321        let props = WriterProperties::builder()
3322            .set_statistics_enabled(EnabledStatistics::None)
3323            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3324            .build();
3325
3326        let mut buf = Vec::with_capacity(1024);
3327        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3328        writer.write(&batch).unwrap();
3329
3330        let metadata = writer.close().unwrap();
3331        assert_eq!(metadata.row_groups.len(), 1);
3332        let row_group = &metadata.row_groups[0];
3333        assert_eq!(row_group.columns.len(), 2);
3334        // Column "a" has both offset and column index, as requested
3335        assert!(row_group.columns[0].offset_index_offset.is_some());
3336        assert!(row_group.columns[0].column_index_offset.is_some());
3337        // Column "b" should only have offset index
3338        assert!(row_group.columns[1].offset_index_offset.is_some());
3339        assert!(row_group.columns[1].column_index_offset.is_none());
3340
3341        let options = ReadOptionsBuilder::new().with_page_index().build();
3342        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3343
3344        let row_group = reader.get_row_group(0).unwrap();
3345        let a_col = row_group.metadata().column(0);
3346        let b_col = row_group.metadata().column(1);
3347
3348        // Column chunk of column "a" should have chunk level statistics
3349        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3350            let min = byte_array_stats.min_opt().unwrap();
3351            let max = byte_array_stats.max_opt().unwrap();
3352
3353            assert_eq!(min.as_bytes(), b"a");
3354            assert_eq!(max.as_bytes(), b"d");
3355        } else {
3356            panic!("expecting Statistics::ByteArray");
3357        }
3358
3359        // The column chunk for column "b" shouldn't have statistics
3360        assert!(b_col.statistics().is_none());
3361
3362        let offset_index = reader.metadata().offset_index().unwrap();
3363        assert_eq!(offset_index.len(), 1); // 1 row group
3364        assert_eq!(offset_index[0].len(), 2); // 2 columns
3365
3366        let column_index = reader.metadata().column_index().unwrap();
3367        assert_eq!(column_index.len(), 1); // 1 row group
3368        assert_eq!(column_index[0].len(), 2); // 2 columns
3369
3370        let a_idx = &column_index[0][0];
3371        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3372        let b_idx = &column_index[0][1];
3373        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3374    }
3375
3376    #[test]
3377    fn test_disabled_statistics_with_chunk() {
3378        let file_schema = Schema::new(vec![
3379            Field::new("a", DataType::Utf8, true),
3380            Field::new("b", DataType::Utf8, true),
3381        ]);
3382        let file_schema = Arc::new(file_schema);
3383
3384        let batch = RecordBatch::try_new(
3385            file_schema.clone(),
3386            vec![
3387                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3388                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3389            ],
3390        )
3391        .unwrap();
3392
3393        let props = WriterProperties::builder()
3394            .set_statistics_enabled(EnabledStatistics::None)
3395            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3396            .build();
3397
3398        let mut buf = Vec::with_capacity(1024);
3399        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3400        writer.write(&batch).unwrap();
3401
3402        let metadata = writer.close().unwrap();
3403        assert_eq!(metadata.row_groups.len(), 1);
3404        let row_group = &metadata.row_groups[0];
3405        assert_eq!(row_group.columns.len(), 2);
3406        // Column "a" should only have offset index
3407        assert!(row_group.columns[0].offset_index_offset.is_some());
3408        assert!(row_group.columns[0].column_index_offset.is_none());
3409        // Column "b" should only have offset index
3410        assert!(row_group.columns[1].offset_index_offset.is_some());
3411        assert!(row_group.columns[1].column_index_offset.is_none());
3412
3413        let options = ReadOptionsBuilder::new().with_page_index().build();
3414        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3415
3416        let row_group = reader.get_row_group(0).unwrap();
3417        let a_col = row_group.metadata().column(0);
3418        let b_col = row_group.metadata().column(1);
3419
3420        // Column chunk of column "a" should have chunk level statistics
3421        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3422            let min = byte_array_stats.min_opt().unwrap();
3423            let max = byte_array_stats.max_opt().unwrap();
3424
3425            assert_eq!(min.as_bytes(), b"a");
3426            assert_eq!(max.as_bytes(), b"d");
3427        } else {
3428            panic!("expecting Statistics::ByteArray");
3429        }
3430
3431        // The column chunk for column "b"  shouldn't have statistics
3432        assert!(b_col.statistics().is_none());
3433
3434        let column_index = reader.metadata().column_index().unwrap();
3435        assert_eq!(column_index.len(), 1); // 1 row group
3436        assert_eq!(column_index[0].len(), 2); // 2 columns
3437
3438        let a_idx = &column_index[0][0];
3439        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3440        let b_idx = &column_index[0][1];
3441        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3442    }
3443
3444    #[test]
3445    fn test_arrow_writer_skip_metadata() {
3446        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3447        let file_schema = Arc::new(batch_schema.clone());
3448
3449        let batch = RecordBatch::try_new(
3450            Arc::new(batch_schema),
3451            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3452        )
3453        .unwrap();
3454        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3455
3456        let mut buf = Vec::with_capacity(1024);
3457        let mut writer =
3458            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3459        writer.write(&batch).unwrap();
3460        writer.close().unwrap();
3461
3462        let bytes = Bytes::from(buf);
3463        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3464        assert_eq!(file_schema, *reader_builder.schema());
3465        if let Some(key_value_metadata) = reader_builder
3466            .metadata()
3467            .file_metadata()
3468            .key_value_metadata()
3469        {
3470            assert!(!key_value_metadata
3471                .iter()
3472                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3473        }
3474    }
3475
3476    #[test]
3477    fn mismatched_schemas() {
3478        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3479        let file_schema = Arc::new(Schema::new(vec![Field::new(
3480            "temperature",
3481            DataType::Float64,
3482            false,
3483        )]));
3484
3485        let batch = RecordBatch::try_new(
3486            Arc::new(batch_schema),
3487            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3488        )
3489        .unwrap();
3490
3491        let mut buf = Vec::with_capacity(1024);
3492        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3493
3494        let err = writer.write(&batch).unwrap_err().to_string();
3495        assert_eq!(
3496            err,
3497            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3498        );
3499    }
3500
3501    #[test]
3502    // https://github.com/apache/arrow-rs/issues/6988
3503    fn test_roundtrip_empty_schema() {
3504        // create empty record batch with empty schema
3505        let empty_batch = RecordBatch::try_new_with_options(
3506            Arc::new(Schema::empty()),
3507            vec![],
3508            &RecordBatchOptions::default().with_row_count(Some(0)),
3509        )
3510        .unwrap();
3511
3512        // write to parquet
3513        let mut parquet_bytes: Vec<u8> = Vec::new();
3514        let mut writer =
3515            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3516        writer.write(&empty_batch).unwrap();
3517        writer.close().unwrap();
3518
3519        // read from parquet
3520        let bytes = Bytes::from(parquet_bytes);
3521        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3522        assert_eq!(reader.schema(), &empty_batch.schema());
3523        let batches: Vec<_> = reader
3524            .build()
3525            .unwrap()
3526            .collect::<ArrowResult<Vec<_>>>()
3527            .unwrap();
3528        assert_eq!(batches.len(), 0);
3529    }
3530}