parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58/// Encodes [`RecordBatch`] to parquet
59///
60/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
61/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
62/// flushed on close, leading the final row group in the output file to potentially
63/// contain fewer than `max_row_group_size` rows
64///
65/// # Example: Writing `RecordBatch`es
66/// ```
67/// # use std::sync::Arc;
68/// # use bytes::Bytes;
69/// # use arrow_array::{ArrayRef, Int64Array};
70/// # use arrow_array::RecordBatch;
71/// # use parquet::arrow::arrow_writer::ArrowWriter;
72/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
73/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
74/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
75///
76/// let mut buffer = Vec::new();
77/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
78/// writer.write(&to_write).unwrap();
79/// writer.close().unwrap();
80///
81/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
82/// let read = reader.next().unwrap().unwrap();
83///
84/// assert_eq!(to_write, read);
85/// ```
86///
87/// # Memory Usage and Limiting
88///
89/// The nature of Parquet requires buffering of an entire row group before it can
90/// be flushed to the underlying writer. Data is mostly buffered in its encoded
91/// form, reducing memory usage. However, some data such as dictionary keys,
92/// large strings or very nested data may still result in non-trivial memory
93/// usage.
94///
95/// See Also:
96/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
97/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
98///
99/// Call [`Self::flush`] to trigger an early flush of a row group based on a
100/// memory threshold and/or global memory pressure. However,  smaller row groups
101/// result in higher metadata overheads, and thus may worsen compression ratios
102/// and query performance.
103///
104/// ```no_run
105/// # use std::io::Write;
106/// # use arrow_array::RecordBatch;
107/// # use parquet::arrow::ArrowWriter;
108/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
109/// # let batch: RecordBatch = todo!();
110/// writer.write(&batch).unwrap();
111/// // Trigger an early flush if anticipated size exceeds 1_000_000
112/// if writer.in_progress_size() > 1_000_000 {
113///     writer.flush().unwrap();
114/// }
115/// ```
116///
117/// ## Type Support
118///
119/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
120/// Parquet types including  [`StructArray`] and [`ListArray`].
121///
122/// The following are not supported:
123///
124/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
125///
126/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
127/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
128/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
129/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
130/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
131pub struct ArrowWriter<W: Write> {
132    /// Underlying Parquet writer
133    writer: SerializedFileWriter<W>,
134
135    /// The in-progress row group if any
136    in_progress: Option<ArrowRowGroupWriter>,
137
138    /// A copy of the Arrow schema.
139    ///
140    /// The schema is used to verify that each record batch written has the correct schema
141    arrow_schema: SchemaRef,
142
143    /// Creates new [`ArrowRowGroupWriter`] instances as required
144    row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146    /// The length of arrays to write to each row group
147    max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        let buffered_memory = self.in_progress_size();
153        f.debug_struct("ArrowWriter")
154            .field("writer", &self.writer)
155            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156            .field("in_progress_rows", &self.in_progress_rows())
157            .field("arrow_schema", &self.arrow_schema)
158            .field("max_row_group_size", &self.max_row_group_size)
159            .finish()
160    }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164    /// Try to create a new Arrow writer
165    ///
166    /// The writer will fail if:
167    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
168    ///  * the Arrow schema contains unsupported datatypes such as Unions
169    pub fn try_new(
170        writer: W,
171        arrow_schema: SchemaRef,
172        props: Option<WriterProperties>,
173    ) -> Result<Self> {
174        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175        Self::try_new_with_options(writer, arrow_schema, options)
176    }
177
178    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
179    ///
180    /// The writer will fail if:
181    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
182    ///  * the Arrow schema contains unsupported datatypes such as Unions
183    pub fn try_new_with_options(
184        writer: W,
185        arrow_schema: SchemaRef,
186        options: ArrowWriterOptions,
187    ) -> Result<Self> {
188        let mut props = options.properties;
189        let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190        if let Some(schema_root) = &options.schema_root {
191            converter = converter.schema_root(schema_root);
192        }
193        let schema = converter.convert(&arrow_schema)?;
194        if !options.skip_arrow_metadata {
195            // add serialized arrow schema
196            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197        }
198
199        let max_row_group_size = props.max_row_group_size();
200
201        let file_writer =
202            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204        let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206        Ok(Self {
207            writer: file_writer,
208            in_progress: None,
209            arrow_schema,
210            row_group_writer_factory,
211            max_row_group_size,
212        })
213    }
214
215    /// Returns metadata for any flushed row groups
216    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217        self.writer.flushed_row_groups()
218    }
219
220    /// Estimated memory usage, in bytes, of this `ArrowWriter`
221    ///
222    /// This estimate is formed bu summing the values of
223    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
224    pub fn memory_size(&self) -> usize {
225        match &self.in_progress {
226            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227            None => 0,
228        }
229    }
230
231    /// Anticipated encoded size of the in progress row group.
232    ///
233    /// This estimate the row group size after being completely encoded is,
234    /// formed by summing the values of
235    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
236    /// columns.
237    pub fn in_progress_size(&self) -> usize {
238        match &self.in_progress {
239            Some(in_progress) => in_progress
240                .writers
241                .iter()
242                .map(|x| x.get_estimated_total_bytes())
243                .sum(),
244            None => 0,
245        }
246    }
247
248    /// Returns the number of rows buffered in the in progress row group
249    pub fn in_progress_rows(&self) -> usize {
250        self.in_progress
251            .as_ref()
252            .map(|x| x.buffered_rows)
253            .unwrap_or_default()
254    }
255
256    /// Returns the number of bytes written by this instance
257    pub fn bytes_written(&self) -> usize {
258        self.writer.bytes_written()
259    }
260
261    /// Encodes the provided [`RecordBatch`]
262    ///
263    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
264    /// rows, the contents of `batch` will be written to one or more row groups such that all but
265    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
266    ///
267    /// This will fail if the `batch`'s schema does not match the writer's schema.
268    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269        if batch.num_rows() == 0 {
270            return Ok(());
271        }
272
273        let in_progress = match &mut self.in_progress {
274            Some(in_progress) => in_progress,
275            x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276                self.writer.schema_descr(),
277                self.writer.properties(),
278                &self.arrow_schema,
279                self.writer.flushed_row_groups().len(),
280            )?),
281        };
282
283        // If would exceed max_row_group_size, split batch
284        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285            let to_write = self.max_row_group_size - in_progress.buffered_rows;
286            let a = batch.slice(0, to_write);
287            let b = batch.slice(to_write, batch.num_rows() - to_write);
288            self.write(&a)?;
289            return self.write(&b);
290        }
291
292        in_progress.write(batch)?;
293
294        if in_progress.buffered_rows >= self.max_row_group_size {
295            self.flush()?
296        }
297        Ok(())
298    }
299
300    /// Writes the given buf bytes to the internal buffer.
301    ///
302    /// It's safe to use this method to write data to the underlying writer,
303    /// because it will ensure that the buffering and byte‐counting layers are used.
304    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
305        self.writer.write_all(buf)
306    }
307
308    /// Flushes all buffered rows into a new row group
309    pub fn flush(&mut self) -> Result<()> {
310        let in_progress = match self.in_progress.take() {
311            Some(in_progress) => in_progress,
312            None => return Ok(()),
313        };
314
315        let mut row_group_writer = self.writer.next_row_group()?;
316        for chunk in in_progress.close()? {
317            chunk.append_to_row_group(&mut row_group_writer)?;
318        }
319        row_group_writer.close()?;
320        Ok(())
321    }
322
323    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
324    ///
325    /// This method provide a way to append kv_metadata after write RecordBatch
326    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
327        self.writer.append_key_value_metadata(kv_metadata)
328    }
329
330    /// Returns a reference to the underlying writer.
331    pub fn inner(&self) -> &W {
332        self.writer.inner()
333    }
334
335    /// Returns a mutable reference to the underlying writer.
336    ///
337    /// **Warning**: if you write directly to this writer, you will skip
338    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
339    /// the file footer’s recorded offsets and sizes to diverge from reality,
340    /// resulting in an unreadable or corrupted Parquet file.
341    ///
342    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
343    pub fn inner_mut(&mut self) -> &mut W {
344        self.writer.inner_mut()
345    }
346
347    /// Flushes any outstanding data and returns the underlying writer.
348    pub fn into_inner(mut self) -> Result<W> {
349        self.flush()?;
350        self.writer.into_inner()
351    }
352
353    /// Close and finalize the underlying Parquet writer
354    ///
355    /// Unlike [`Self::close`] this does not consume self
356    ///
357    /// Attempting to write after calling finish will result in an error
358    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
359        self.flush()?;
360        self.writer.finish()
361    }
362
363    /// Close and finalize the underlying Parquet writer
364    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
365        self.finish()
366    }
367}
368
369impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
370    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
371        self.write(batch).map_err(|e| e.into())
372    }
373
374    fn close(self) -> std::result::Result<(), ArrowError> {
375        self.close()?;
376        Ok(())
377    }
378}
379
380/// Arrow-specific configuration settings for writing parquet files.
381///
382/// See [`ArrowWriter`] for how to configure the writer.
383#[derive(Debug, Clone, Default)]
384pub struct ArrowWriterOptions {
385    properties: WriterProperties,
386    skip_arrow_metadata: bool,
387    schema_root: Option<String>,
388}
389
390impl ArrowWriterOptions {
391    /// Creates a new [`ArrowWriterOptions`] with the default settings.
392    pub fn new() -> Self {
393        Self::default()
394    }
395
396    /// Sets the [`WriterProperties`] for writing parquet files.
397    pub fn with_properties(self, properties: WriterProperties) -> Self {
398        Self { properties, ..self }
399    }
400
401    /// Skip encoding the embedded arrow metadata (defaults to `false`)
402    ///
403    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
404    /// by default.
405    ///
406    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
407    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
408        Self {
409            skip_arrow_metadata,
410            ..self
411        }
412    }
413
414    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
415    pub fn with_schema_root(self, schema_root: String) -> Self {
416        Self {
417            schema_root: Some(schema_root),
418            ..self
419        }
420    }
421}
422
423/// A single column chunk produced by [`ArrowColumnWriter`]
424#[derive(Default)]
425struct ArrowColumnChunkData {
426    length: usize,
427    data: Vec<Bytes>,
428}
429
430impl Length for ArrowColumnChunkData {
431    fn len(&self) -> u64 {
432        self.length as _
433    }
434}
435
436impl ChunkReader for ArrowColumnChunkData {
437    type T = ArrowColumnChunkReader;
438
439    fn get_read(&self, start: u64) -> Result<Self::T> {
440        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
441        Ok(ArrowColumnChunkReader(
442            self.data.clone().into_iter().peekable(),
443        ))
444    }
445
446    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
447        unimplemented!()
448    }
449}
450
451/// A [`Read`] for [`ArrowColumnChunkData`]
452struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
453
454impl Read for ArrowColumnChunkReader {
455    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
456        let buffer = loop {
457            match self.0.peek_mut() {
458                Some(b) if b.is_empty() => {
459                    self.0.next();
460                    continue;
461                }
462                Some(b) => break b,
463                None => return Ok(0),
464            }
465        };
466
467        let len = buffer.len().min(out.len());
468        let b = buffer.split_to(len);
469        out[..len].copy_from_slice(&b);
470        Ok(len)
471    }
472}
473
474/// A shared [`ArrowColumnChunkData`]
475///
476/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
477/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
478type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
479
480#[derive(Default)]
481struct ArrowPageWriter {
482    buffer: SharedColumnChunk,
483    #[cfg(feature = "encryption")]
484    page_encryptor: Option<PageEncryptor>,
485}
486
487impl ArrowPageWriter {
488    #[cfg(feature = "encryption")]
489    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
490        self.page_encryptor = page_encryptor;
491        self
492    }
493
494    #[cfg(feature = "encryption")]
495    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
496        self.page_encryptor.as_mut()
497    }
498
499    #[cfg(not(feature = "encryption"))]
500    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
501        None
502    }
503}
504
505impl PageWriter for ArrowPageWriter {
506    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
507        let page = match self.page_encryptor_mut() {
508            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
509            None => page,
510        };
511
512        let page_header = page.to_thrift_header();
513        let header = {
514            let mut header = Vec::with_capacity(1024);
515
516            match self.page_encryptor_mut() {
517                Some(page_encryptor) => {
518                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
519                    if page.compressed_page().is_data_page() {
520                        page_encryptor.increment_page();
521                    }
522                }
523                None => {
524                    let mut protocol = TCompactOutputProtocol::new(&mut header);
525                    page_header.write_to_out_protocol(&mut protocol)?;
526                }
527            };
528
529            Bytes::from(header)
530        };
531
532        let mut buf = self.buffer.try_lock().unwrap();
533
534        let data = page.compressed_page().buffer().clone();
535        let compressed_size = data.len() + header.len();
536
537        let mut spec = PageWriteSpec::new();
538        spec.page_type = page.page_type();
539        spec.num_values = page.num_values();
540        spec.uncompressed_size = page.uncompressed_size() + header.len();
541        spec.offset = buf.length as u64;
542        spec.compressed_size = compressed_size;
543        spec.bytes_written = compressed_size as u64;
544
545        buf.length += compressed_size;
546        buf.data.push(header);
547        buf.data.push(data);
548
549        Ok(spec)
550    }
551
552    fn close(&mut self) -> Result<()> {
553        Ok(())
554    }
555}
556
557/// A leaf column that can be encoded by [`ArrowColumnWriter`]
558#[derive(Debug)]
559pub struct ArrowLeafColumn(ArrayLevels);
560
561/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
562pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
563    let levels = calculate_array_levels(array, field)?;
564    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
565}
566
567/// The data for a single column chunk, see [`ArrowColumnWriter`]
568pub struct ArrowColumnChunk {
569    data: ArrowColumnChunkData,
570    close: ColumnCloseResult,
571}
572
573impl std::fmt::Debug for ArrowColumnChunk {
574    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
575        f.debug_struct("ArrowColumnChunk")
576            .field("length", &self.data.length)
577            .finish_non_exhaustive()
578    }
579}
580
581impl ArrowColumnChunk {
582    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
583    pub fn append_to_row_group<W: Write + Send>(
584        self,
585        writer: &mut SerializedRowGroupWriter<'_, W>,
586    ) -> Result<()> {
587        writer.append_column(&self.data, self.close)
588    }
589}
590
591/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
592///
593/// Note: This is a low-level interface for applications that require
594/// fine-grained control of encoding (e.g. encoding using multiple threads),
595/// see [`ArrowWriter`] for a higher-level interface
596///
597/// # Example: Encoding two Arrow Array's in Parallel
598/// ```
599/// // The arrow schema
600/// # use std::sync::Arc;
601/// # use arrow_array::*;
602/// # use arrow_schema::*;
603/// # use parquet::arrow::ArrowSchemaConverter;
604/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
605/// # use parquet::file::properties::WriterProperties;
606/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
607/// #
608/// let schema = Arc::new(Schema::new(vec![
609///     Field::new("i32", DataType::Int32, false),
610///     Field::new("f32", DataType::Float32, false),
611/// ]));
612///
613/// // Compute the parquet schema
614/// let props = Arc::new(WriterProperties::default());
615/// let parquet_schema = ArrowSchemaConverter::new()
616///   .with_coerce_types(props.coerce_types())
617///   .convert(&schema)
618///   .unwrap();
619///
620/// // Create writers for each of the leaf columns
621/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
622///
623/// // Spawn a worker thread for each column
624/// //
625/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
626/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
627/// let mut workers: Vec<_> = col_writers
628///     .into_iter()
629///     .map(|mut col_writer| {
630///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
631///         let handle = std::thread::spawn(move || {
632///             // receive Arrays to encode via the channel
633///             for col in recv {
634///                 col_writer.write(&col)?;
635///             }
636///             // once the input is complete, close the writer
637///             // to return the newly created ArrowColumnChunk
638///             col_writer.close()
639///         });
640///         (handle, send)
641///     })
642///     .collect();
643///
644/// // Create parquet writer
645/// let root_schema = parquet_schema.root_schema_ptr();
646/// // write to memory in the example, but this could be a File
647/// let mut out = Vec::with_capacity(1024);
648/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
649///   .unwrap();
650///
651/// // Start row group
652/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
653///   .next_row_group()
654///   .unwrap();
655///
656/// // Create some example input columns to encode
657/// let to_write = vec![
658///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
659///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
660/// ];
661///
662/// // Send the input columns to the workers
663/// let mut worker_iter = workers.iter_mut();
664/// for (arr, field) in to_write.iter().zip(&schema.fields) {
665///     for leaves in compute_leaves(field, arr).unwrap() {
666///         worker_iter.next().unwrap().1.send(leaves).unwrap();
667///     }
668/// }
669///
670/// // Wait for the workers to complete encoding, and append
671/// // the resulting column chunks to the row group (and the file)
672/// for (handle, send) in workers {
673///     drop(send); // Drop send side to signal termination
674///     // wait for the worker to send the completed chunk
675///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
676///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
677/// }
678/// // Close the row group which writes to the underlying file
679/// row_group_writer.close().unwrap();
680///
681/// let metadata = writer.close().unwrap();
682/// assert_eq!(metadata.num_rows, 3);
683/// ```
684pub struct ArrowColumnWriter {
685    writer: ArrowColumnWriterImpl,
686    chunk: SharedColumnChunk,
687}
688
689impl std::fmt::Debug for ArrowColumnWriter {
690    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
692    }
693}
694
695enum ArrowColumnWriterImpl {
696    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
697    Column(ColumnWriter<'static>),
698}
699
700impl ArrowColumnWriter {
701    /// Write an [`ArrowLeafColumn`]
702    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
703        match &mut self.writer {
704            ArrowColumnWriterImpl::Column(c) => {
705                write_leaf(c, &col.0)?;
706            }
707            ArrowColumnWriterImpl::ByteArray(c) => {
708                write_primitive(c, col.0.array().as_ref(), &col.0)?;
709            }
710        }
711        Ok(())
712    }
713
714    /// Close this column returning the written [`ArrowColumnChunk`]
715    pub fn close(self) -> Result<ArrowColumnChunk> {
716        let close = match self.writer {
717            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
718            ArrowColumnWriterImpl::Column(c) => c.close()?,
719        };
720        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
721        let data = chunk.into_inner().unwrap();
722        Ok(ArrowColumnChunk { data, close })
723    }
724
725    /// Returns the estimated total memory usage by the writer.
726    ///
727    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
728    /// of the current memory usage and not it's anticipated encoded size.
729    ///
730    /// This includes:
731    /// 1. Data buffered in encoded form
732    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
733    ///
734    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
735    pub fn memory_size(&self) -> usize {
736        match &self.writer {
737            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
738            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
739        }
740    }
741
742    /// Returns the estimated total encoded bytes for this column writer.
743    ///
744    /// This includes:
745    /// 1. Data buffered in encoded form
746    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
747    ///
748    /// This value should be less than or equal to [`Self::memory_size`]
749    pub fn get_estimated_total_bytes(&self) -> usize {
750        match &self.writer {
751            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
752            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
753        }
754    }
755}
756
757/// Encodes [`RecordBatch`] to a parquet row group
758struct ArrowRowGroupWriter {
759    writers: Vec<ArrowColumnWriter>,
760    schema: SchemaRef,
761    buffered_rows: usize,
762}
763
764impl ArrowRowGroupWriter {
765    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
766        Self {
767            writers,
768            schema: arrow.clone(),
769            buffered_rows: 0,
770        }
771    }
772
773    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
774        self.buffered_rows += batch.num_rows();
775        let mut writers = self.writers.iter_mut();
776        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
777            for leaf in compute_leaves(field.as_ref(), column)? {
778                writers.next().unwrap().write(&leaf)?
779            }
780        }
781        Ok(())
782    }
783
784    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
785        self.writers
786            .into_iter()
787            .map(|writer| writer.close())
788            .collect()
789    }
790}
791
792struct ArrowRowGroupWriterFactory {
793    #[cfg(feature = "encryption")]
794    file_encryptor: Option<Arc<FileEncryptor>>,
795}
796
797impl ArrowRowGroupWriterFactory {
798    #[cfg(feature = "encryption")]
799    fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
800        Self {
801            file_encryptor: file_writer.file_encryptor(),
802        }
803    }
804
805    #[cfg(not(feature = "encryption"))]
806    fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
807        Self {}
808    }
809
810    #[cfg(feature = "encryption")]
811    fn create_row_group_writer(
812        &self,
813        parquet: &SchemaDescriptor,
814        props: &WriterPropertiesPtr,
815        arrow: &SchemaRef,
816        row_group_index: usize,
817    ) -> Result<ArrowRowGroupWriter> {
818        let writers = get_column_writers_with_encryptor(
819            parquet,
820            props,
821            arrow,
822            self.file_encryptor.clone(),
823            row_group_index,
824        )?;
825        Ok(ArrowRowGroupWriter::new(writers, arrow))
826    }
827
828    #[cfg(not(feature = "encryption"))]
829    fn create_row_group_writer(
830        &self,
831        parquet: &SchemaDescriptor,
832        props: &WriterPropertiesPtr,
833        arrow: &SchemaRef,
834        _row_group_index: usize,
835    ) -> Result<ArrowRowGroupWriter> {
836        let writers = get_column_writers(parquet, props, arrow)?;
837        Ok(ArrowRowGroupWriter::new(writers, arrow))
838    }
839}
840
841/// Returns the [`ArrowColumnWriter`] for a given schema
842pub fn get_column_writers(
843    parquet: &SchemaDescriptor,
844    props: &WriterPropertiesPtr,
845    arrow: &SchemaRef,
846) -> Result<Vec<ArrowColumnWriter>> {
847    let mut writers = Vec::with_capacity(arrow.fields.len());
848    let mut leaves = parquet.columns().iter();
849    let column_factory = ArrowColumnWriterFactory::new();
850    for field in &arrow.fields {
851        column_factory.get_arrow_column_writer(
852            field.data_type(),
853            props,
854            &mut leaves,
855            &mut writers,
856        )?;
857    }
858    Ok(writers)
859}
860
861/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
862#[cfg(feature = "encryption")]
863fn get_column_writers_with_encryptor(
864    parquet: &SchemaDescriptor,
865    props: &WriterPropertiesPtr,
866    arrow: &SchemaRef,
867    file_encryptor: Option<Arc<FileEncryptor>>,
868    row_group_index: usize,
869) -> Result<Vec<ArrowColumnWriter>> {
870    let mut writers = Vec::with_capacity(arrow.fields.len());
871    let mut leaves = parquet.columns().iter();
872    let column_factory =
873        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
874    for field in &arrow.fields {
875        column_factory.get_arrow_column_writer(
876            field.data_type(),
877            props,
878            &mut leaves,
879            &mut writers,
880        )?;
881    }
882    Ok(writers)
883}
884
885/// Gets [`ArrowColumnWriter`] instances for different data types
886struct ArrowColumnWriterFactory {
887    #[cfg(feature = "encryption")]
888    row_group_index: usize,
889    #[cfg(feature = "encryption")]
890    file_encryptor: Option<Arc<FileEncryptor>>,
891}
892
893impl ArrowColumnWriterFactory {
894    pub fn new() -> Self {
895        Self {
896            #[cfg(feature = "encryption")]
897            row_group_index: 0,
898            #[cfg(feature = "encryption")]
899            file_encryptor: None,
900        }
901    }
902
903    #[cfg(feature = "encryption")]
904    pub fn with_file_encryptor(
905        mut self,
906        row_group_index: usize,
907        file_encryptor: Option<Arc<FileEncryptor>>,
908    ) -> Self {
909        self.row_group_index = row_group_index;
910        self.file_encryptor = file_encryptor;
911        self
912    }
913
914    #[cfg(feature = "encryption")]
915    fn create_page_writer(
916        &self,
917        column_descriptor: &ColumnDescPtr,
918        column_index: usize,
919    ) -> Result<Box<ArrowPageWriter>> {
920        let column_path = column_descriptor.path().string();
921        let page_encryptor = PageEncryptor::create_if_column_encrypted(
922            &self.file_encryptor,
923            self.row_group_index,
924            column_index,
925            &column_path,
926        )?;
927        Ok(Box::new(
928            ArrowPageWriter::default().with_encryptor(page_encryptor),
929        ))
930    }
931
932    #[cfg(not(feature = "encryption"))]
933    fn create_page_writer(
934        &self,
935        _column_descriptor: &ColumnDescPtr,
936        _column_index: usize,
937    ) -> Result<Box<ArrowPageWriter>> {
938        Ok(Box::<ArrowPageWriter>::default())
939    }
940
941    /// Gets the [`ArrowColumnWriter`] for the given `data_type`
942    fn get_arrow_column_writer(
943        &self,
944        data_type: &ArrowDataType,
945        props: &WriterPropertiesPtr,
946        leaves: &mut Iter<'_, ColumnDescPtr>,
947        out: &mut Vec<ArrowColumnWriter>,
948    ) -> Result<()> {
949        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
950            let page_writer = self.create_page_writer(desc, out.len())?;
951            let chunk = page_writer.buffer.clone();
952            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
953            Ok(ArrowColumnWriter {
954                chunk,
955                writer: ArrowColumnWriterImpl::Column(writer),
956            })
957        };
958
959        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
960            let page_writer = self.create_page_writer(desc, out.len())?;
961            let chunk = page_writer.buffer.clone();
962            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
963            Ok(ArrowColumnWriter {
964                chunk,
965                writer: ArrowColumnWriterImpl::ByteArray(writer),
966            })
967        };
968
969        match data_type {
970            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
971            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
972            ArrowDataType::LargeBinary
973            | ArrowDataType::Binary
974            | ArrowDataType::Utf8
975            | ArrowDataType::LargeUtf8
976            | ArrowDataType::BinaryView
977            | ArrowDataType::Utf8View => {
978                out.push(bytes(leaves.next().unwrap())?)
979            }
980            ArrowDataType::List(f)
981            | ArrowDataType::LargeList(f)
982            | ArrowDataType::FixedSizeList(f, _) => {
983                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
984            }
985            ArrowDataType::Struct(fields) => {
986                for field in fields {
987                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
988                }
989            }
990            ArrowDataType::Map(f, _) => match f.data_type() {
991                ArrowDataType::Struct(f) => {
992                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
993                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
994                }
995                _ => unreachable!("invalid map type"),
996            }
997            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
998                ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
999                    out.push(bytes(leaves.next().unwrap())?)
1000                }
1001                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1002                    out.push(bytes(leaves.next().unwrap())?)
1003                }
1004                ArrowDataType::FixedSizeBinary(_) => {
1005                    out.push(bytes(leaves.next().unwrap())?)
1006                }
1007                _ => {
1008                    out.push(col(leaves.next().unwrap())?)
1009                }
1010            }
1011            _ => return Err(ParquetError::NYI(
1012                format!(
1013                    "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1014                )
1015            ))
1016        }
1017        Ok(())
1018    }
1019}
1020
1021fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1022    let column = levels.array().as_ref();
1023    let indices = levels.non_null_indices();
1024    match writer {
1025        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1026            match column.data_type() {
1027                ArrowDataType::Date64 => {
1028                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1029                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1030                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1031
1032                    let array = array.as_primitive::<Int32Type>();
1033                    write_primitive(typed, array.values(), levels)
1034                }
1035                ArrowDataType::UInt32 => {
1036                    let values = column.as_primitive::<UInt32Type>().values();
1037                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1038                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1039                    let array = values.inner().typed_data::<i32>();
1040                    write_primitive(typed, array, levels)
1041                }
1042                ArrowDataType::Decimal128(_, _) => {
1043                    // use the int32 to represent the decimal with low precision
1044                    let array = column
1045                        .as_primitive::<Decimal128Type>()
1046                        .unary::<_, Int32Type>(|v| v as i32);
1047                    write_primitive(typed, array.values(), levels)
1048                }
1049                ArrowDataType::Decimal256(_, _) => {
1050                    // use the int32 to represent the decimal with low precision
1051                    let array = column
1052                        .as_primitive::<Decimal256Type>()
1053                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1054                    write_primitive(typed, array.values(), levels)
1055                }
1056                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1057                    ArrowDataType::Decimal128(_, _) => {
1058                        let array = arrow_cast::cast(column, value_type)?;
1059                        let array = array
1060                            .as_primitive::<Decimal128Type>()
1061                            .unary::<_, Int32Type>(|v| v as i32);
1062                        write_primitive(typed, array.values(), levels)
1063                    }
1064                    ArrowDataType::Decimal256(_, _) => {
1065                        let array = arrow_cast::cast(column, value_type)?;
1066                        let array = array
1067                            .as_primitive::<Decimal256Type>()
1068                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1069                        write_primitive(typed, array.values(), levels)
1070                    }
1071                    _ => {
1072                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1073                        let array = array.as_primitive::<Int32Type>();
1074                        write_primitive(typed, array.values(), levels)
1075                    }
1076                },
1077                _ => {
1078                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1079                    let array = array.as_primitive::<Int32Type>();
1080                    write_primitive(typed, array.values(), levels)
1081                }
1082            }
1083        }
1084        ColumnWriter::BoolColumnWriter(ref mut typed) => {
1085            let array = column.as_boolean();
1086            typed.write_batch(
1087                get_bool_array_slice(array, indices).as_slice(),
1088                levels.def_levels(),
1089                levels.rep_levels(),
1090            )
1091        }
1092        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1093            match column.data_type() {
1094                ArrowDataType::Date64 => {
1095                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1096
1097                    let array = array.as_primitive::<Int64Type>();
1098                    write_primitive(typed, array.values(), levels)
1099                }
1100                ArrowDataType::Int64 => {
1101                    let array = column.as_primitive::<Int64Type>();
1102                    write_primitive(typed, array.values(), levels)
1103                }
1104                ArrowDataType::UInt64 => {
1105                    let values = column.as_primitive::<UInt64Type>().values();
1106                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1107                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1108                    let array = values.inner().typed_data::<i64>();
1109                    write_primitive(typed, array, levels)
1110                }
1111                ArrowDataType::Decimal128(_, _) => {
1112                    // use the int64 to represent the decimal with low precision
1113                    let array = column
1114                        .as_primitive::<Decimal128Type>()
1115                        .unary::<_, Int64Type>(|v| v as i64);
1116                    write_primitive(typed, array.values(), levels)
1117                }
1118                ArrowDataType::Decimal256(_, _) => {
1119                    // use the int64 to represent the decimal with low precision
1120                    let array = column
1121                        .as_primitive::<Decimal256Type>()
1122                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1123                    write_primitive(typed, array.values(), levels)
1124                }
1125                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1126                    ArrowDataType::Decimal128(_, _) => {
1127                        let array = arrow_cast::cast(column, value_type)?;
1128                        let array = array
1129                            .as_primitive::<Decimal128Type>()
1130                            .unary::<_, Int64Type>(|v| v as i64);
1131                        write_primitive(typed, array.values(), levels)
1132                    }
1133                    ArrowDataType::Decimal256(_, _) => {
1134                        let array = arrow_cast::cast(column, value_type)?;
1135                        let array = array
1136                            .as_primitive::<Decimal256Type>()
1137                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1138                        write_primitive(typed, array.values(), levels)
1139                    }
1140                    _ => {
1141                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1142                        let array = array.as_primitive::<Int64Type>();
1143                        write_primitive(typed, array.values(), levels)
1144                    }
1145                },
1146                _ => {
1147                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1148                    let array = array.as_primitive::<Int64Type>();
1149                    write_primitive(typed, array.values(), levels)
1150                }
1151            }
1152        }
1153        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1154            unreachable!("Currently unreachable because data type not supported")
1155        }
1156        ColumnWriter::FloatColumnWriter(ref mut typed) => {
1157            let array = column.as_primitive::<Float32Type>();
1158            write_primitive(typed, array.values(), levels)
1159        }
1160        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1161            let array = column.as_primitive::<Float64Type>();
1162            write_primitive(typed, array.values(), levels)
1163        }
1164        ColumnWriter::ByteArrayColumnWriter(_) => {
1165            unreachable!("should use ByteArrayWriter")
1166        }
1167        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1168            let bytes = match column.data_type() {
1169                ArrowDataType::Interval(interval_unit) => match interval_unit {
1170                    IntervalUnit::YearMonth => {
1171                        let array = column
1172                            .as_any()
1173                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1174                            .unwrap();
1175                        get_interval_ym_array_slice(array, indices)
1176                    }
1177                    IntervalUnit::DayTime => {
1178                        let array = column
1179                            .as_any()
1180                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1181                            .unwrap();
1182                        get_interval_dt_array_slice(array, indices)
1183                    }
1184                    _ => {
1185                        return Err(ParquetError::NYI(
1186                            format!(
1187                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1188                            )
1189                        ));
1190                    }
1191                },
1192                ArrowDataType::FixedSizeBinary(_) => {
1193                    let array = column
1194                        .as_any()
1195                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1196                        .unwrap();
1197                    get_fsb_array_slice(array, indices)
1198                }
1199                ArrowDataType::Decimal128(_, _) => {
1200                    let array = column.as_primitive::<Decimal128Type>();
1201                    get_decimal_128_array_slice(array, indices)
1202                }
1203                ArrowDataType::Decimal256(_, _) => {
1204                    let array = column
1205                        .as_any()
1206                        .downcast_ref::<arrow_array::Decimal256Array>()
1207                        .unwrap();
1208                    get_decimal_256_array_slice(array, indices)
1209                }
1210                ArrowDataType::Float16 => {
1211                    let array = column.as_primitive::<Float16Type>();
1212                    get_float_16_array_slice(array, indices)
1213                }
1214                _ => {
1215                    return Err(ParquetError::NYI(
1216                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1217                    ));
1218                }
1219            };
1220            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1221        }
1222    }
1223}
1224
1225fn write_primitive<E: ColumnValueEncoder>(
1226    writer: &mut GenericColumnWriter<E>,
1227    values: &E::Values,
1228    levels: &ArrayLevels,
1229) -> Result<usize> {
1230    writer.write_batch_internal(
1231        values,
1232        Some(levels.non_null_indices()),
1233        levels.def_levels(),
1234        levels.rep_levels(),
1235        None,
1236        None,
1237        None,
1238    )
1239}
1240
1241fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1242    let mut values = Vec::with_capacity(indices.len());
1243    for i in indices {
1244        values.push(array.value(*i))
1245    }
1246    values
1247}
1248
1249/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1250/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1251fn get_interval_ym_array_slice(
1252    array: &arrow_array::IntervalYearMonthArray,
1253    indices: &[usize],
1254) -> Vec<FixedLenByteArray> {
1255    let mut values = Vec::with_capacity(indices.len());
1256    for i in indices {
1257        let mut value = array.value(*i).to_le_bytes().to_vec();
1258        let mut suffix = vec![0; 8];
1259        value.append(&mut suffix);
1260        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1261    }
1262    values
1263}
1264
1265/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1266/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1267fn get_interval_dt_array_slice(
1268    array: &arrow_array::IntervalDayTimeArray,
1269    indices: &[usize],
1270) -> Vec<FixedLenByteArray> {
1271    let mut values = Vec::with_capacity(indices.len());
1272    for i in indices {
1273        let mut out = [0; 12];
1274        let value = array.value(*i);
1275        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1276        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1277        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1278    }
1279    values
1280}
1281
1282fn get_decimal_128_array_slice(
1283    array: &arrow_array::Decimal128Array,
1284    indices: &[usize],
1285) -> Vec<FixedLenByteArray> {
1286    let mut values = Vec::with_capacity(indices.len());
1287    let size = decimal_length_from_precision(array.precision());
1288    for i in indices {
1289        let as_be_bytes = array.value(*i).to_be_bytes();
1290        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1291        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1292    }
1293    values
1294}
1295
1296fn get_decimal_256_array_slice(
1297    array: &arrow_array::Decimal256Array,
1298    indices: &[usize],
1299) -> Vec<FixedLenByteArray> {
1300    let mut values = Vec::with_capacity(indices.len());
1301    let size = decimal_length_from_precision(array.precision());
1302    for i in indices {
1303        let as_be_bytes = array.value(*i).to_be_bytes();
1304        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1305        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1306    }
1307    values
1308}
1309
1310fn get_float_16_array_slice(
1311    array: &arrow_array::Float16Array,
1312    indices: &[usize],
1313) -> Vec<FixedLenByteArray> {
1314    let mut values = Vec::with_capacity(indices.len());
1315    for i in indices {
1316        let value = array.value(*i).to_le_bytes().to_vec();
1317        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1318    }
1319    values
1320}
1321
1322fn get_fsb_array_slice(
1323    array: &arrow_array::FixedSizeBinaryArray,
1324    indices: &[usize],
1325) -> Vec<FixedLenByteArray> {
1326    let mut values = Vec::with_capacity(indices.len());
1327    for i in indices {
1328        let value = array.value(*i).to_vec();
1329        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1330    }
1331    values
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336    use super::*;
1337
1338    use std::fs::File;
1339    use std::io::Seek;
1340
1341    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1342    use crate::arrow::ARROW_SCHEMA_META_KEY;
1343    use crate::column::page::{Page, PageReader};
1344    use crate::file::page_encoding_stats::PageEncodingStats;
1345    use crate::file::reader::SerializedPageReader;
1346    use crate::format::PageHeader;
1347    use crate::schema::types::ColumnPath;
1348    use crate::thrift::TCompactSliceInputProtocol;
1349    use arrow::datatypes::ToByteSlice;
1350    use arrow::datatypes::{DataType, Schema};
1351    use arrow::error::Result as ArrowResult;
1352    use arrow::util::data_gen::create_random_array;
1353    use arrow::util::pretty::pretty_format_batches;
1354    use arrow::{array::*, buffer::Buffer};
1355    use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1356    use arrow_schema::Fields;
1357    use half::f16;
1358    use num::{FromPrimitive, ToPrimitive};
1359
1360    use crate::basic::Encoding;
1361    use crate::data_type::AsBytes;
1362    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1363    use crate::file::page_index::index::Index;
1364    use crate::file::properties::{
1365        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1366    };
1367    use crate::file::serialized_reader::ReadOptionsBuilder;
1368    use crate::file::{
1369        reader::{FileReader, SerializedFileReader},
1370        statistics::Statistics,
1371    };
1372
1373    #[test]
1374    fn arrow_writer() {
1375        // define schema
1376        let schema = Schema::new(vec![
1377            Field::new("a", DataType::Int32, false),
1378            Field::new("b", DataType::Int32, true),
1379        ]);
1380
1381        // create some data
1382        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1383        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1384
1385        // build a record batch
1386        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1387
1388        roundtrip(batch, Some(SMALL_SIZE / 2));
1389    }
1390
1391    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1392        let mut buffer = vec![];
1393
1394        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1395        writer.write(expected_batch).unwrap();
1396        writer.close().unwrap();
1397
1398        buffer
1399    }
1400
1401    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1402        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1403        writer.write(expected_batch).unwrap();
1404        writer.into_inner().unwrap()
1405    }
1406
1407    #[test]
1408    fn roundtrip_bytes() {
1409        // define schema
1410        let schema = Arc::new(Schema::new(vec![
1411            Field::new("a", DataType::Int32, false),
1412            Field::new("b", DataType::Int32, true),
1413        ]));
1414
1415        // create some data
1416        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1417        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1418
1419        // build a record batch
1420        let expected_batch =
1421            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1422
1423        for buffer in [
1424            get_bytes_after_close(schema.clone(), &expected_batch),
1425            get_bytes_by_into_inner(schema, &expected_batch),
1426        ] {
1427            let cursor = Bytes::from(buffer);
1428            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1429
1430            let actual_batch = record_batch_reader
1431                .next()
1432                .expect("No batch found")
1433                .expect("Unable to get batch");
1434
1435            assert_eq!(expected_batch.schema(), actual_batch.schema());
1436            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1437            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1438            for i in 0..expected_batch.num_columns() {
1439                let expected_data = expected_batch.column(i).to_data();
1440                let actual_data = actual_batch.column(i).to_data();
1441
1442                assert_eq!(expected_data, actual_data);
1443            }
1444        }
1445    }
1446
1447    #[test]
1448    fn arrow_writer_non_null() {
1449        // define schema
1450        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1451
1452        // create some data
1453        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1454
1455        // build a record batch
1456        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1457
1458        roundtrip(batch, Some(SMALL_SIZE / 2));
1459    }
1460
1461    #[test]
1462    fn arrow_writer_list() {
1463        // define schema
1464        let schema = Schema::new(vec![Field::new(
1465            "a",
1466            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1467            true,
1468        )]);
1469
1470        // create some data
1471        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1472
1473        // Construct a buffer for value offsets, for the nested array:
1474        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1475        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1476
1477        // Construct a list array from the above two
1478        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1479            DataType::Int32,
1480            false,
1481        ))))
1482        .len(5)
1483        .add_buffer(a_value_offsets)
1484        .add_child_data(a_values.into_data())
1485        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1486        .build()
1487        .unwrap();
1488        let a = ListArray::from(a_list_data);
1489
1490        // build a record batch
1491        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1492
1493        assert_eq!(batch.column(0).null_count(), 1);
1494
1495        // This test fails if the max row group size is less than the batch's length
1496        // see https://github.com/apache/arrow-rs/issues/518
1497        roundtrip(batch, None);
1498    }
1499
1500    #[test]
1501    fn arrow_writer_list_non_null() {
1502        // define schema
1503        let schema = Schema::new(vec![Field::new(
1504            "a",
1505            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1506            false,
1507        )]);
1508
1509        // create some data
1510        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1511
1512        // Construct a buffer for value offsets, for the nested array:
1513        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1514        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1515
1516        // Construct a list array from the above two
1517        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1518            DataType::Int32,
1519            false,
1520        ))))
1521        .len(5)
1522        .add_buffer(a_value_offsets)
1523        .add_child_data(a_values.into_data())
1524        .build()
1525        .unwrap();
1526        let a = ListArray::from(a_list_data);
1527
1528        // build a record batch
1529        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1530
1531        // This test fails if the max row group size is less than the batch's length
1532        // see https://github.com/apache/arrow-rs/issues/518
1533        assert_eq!(batch.column(0).null_count(), 0);
1534
1535        roundtrip(batch, None);
1536    }
1537
1538    #[test]
1539    fn arrow_writer_binary() {
1540        let string_field = Field::new("a", DataType::Utf8, false);
1541        let binary_field = Field::new("b", DataType::Binary, false);
1542        let schema = Schema::new(vec![string_field, binary_field]);
1543
1544        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1545        let raw_binary_values = [
1546            b"foo".to_vec(),
1547            b"bar".to_vec(),
1548            b"baz".to_vec(),
1549            b"quux".to_vec(),
1550        ];
1551        let raw_binary_value_refs = raw_binary_values
1552            .iter()
1553            .map(|x| x.as_slice())
1554            .collect::<Vec<_>>();
1555
1556        let string_values = StringArray::from(raw_string_values.clone());
1557        let binary_values = BinaryArray::from(raw_binary_value_refs);
1558        let batch = RecordBatch::try_new(
1559            Arc::new(schema),
1560            vec![Arc::new(string_values), Arc::new(binary_values)],
1561        )
1562        .unwrap();
1563
1564        roundtrip(batch, Some(SMALL_SIZE / 2));
1565    }
1566
1567    #[test]
1568    fn arrow_writer_binary_view() {
1569        let string_field = Field::new("a", DataType::Utf8View, false);
1570        let binary_field = Field::new("b", DataType::BinaryView, false);
1571        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1572        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1573
1574        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1575        let raw_binary_values = vec![
1576            b"foo".to_vec(),
1577            b"bar".to_vec(),
1578            b"large payload over 12 bytes".to_vec(),
1579            b"lulu".to_vec(),
1580        ];
1581        let nullable_string_values =
1582            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1583
1584        let string_view_values = StringViewArray::from(raw_string_values);
1585        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1586        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1587        let batch = RecordBatch::try_new(
1588            Arc::new(schema),
1589            vec![
1590                Arc::new(string_view_values),
1591                Arc::new(binary_view_values),
1592                Arc::new(nullable_string_view_values),
1593            ],
1594        )
1595        .unwrap();
1596
1597        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1598        roundtrip(batch, None);
1599    }
1600
1601    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1602        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1603        let schema = Schema::new(vec![decimal_field]);
1604
1605        let decimal_values = vec![10_000, 50_000, 0, -100]
1606            .into_iter()
1607            .map(Some)
1608            .collect::<Decimal128Array>()
1609            .with_precision_and_scale(precision, scale)
1610            .unwrap();
1611
1612        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1613    }
1614
1615    #[test]
1616    fn arrow_writer_decimal() {
1617        // int32 to store the decimal value
1618        let batch_int32_decimal = get_decimal_batch(5, 2);
1619        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1620        // int64 to store the decimal value
1621        let batch_int64_decimal = get_decimal_batch(12, 2);
1622        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1623        // fixed_length_byte_array to store the decimal value
1624        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1625        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1626    }
1627
1628    #[test]
1629    fn arrow_writer_complex() {
1630        // define schema
1631        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1632        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1633        let struct_field_g = Arc::new(Field::new_list(
1634            "g",
1635            Field::new_list_field(DataType::Int16, true),
1636            false,
1637        ));
1638        let struct_field_h = Arc::new(Field::new_list(
1639            "h",
1640            Field::new_list_field(DataType::Int16, false),
1641            true,
1642        ));
1643        let struct_field_e = Arc::new(Field::new_struct(
1644            "e",
1645            vec![
1646                struct_field_f.clone(),
1647                struct_field_g.clone(),
1648                struct_field_h.clone(),
1649            ],
1650            false,
1651        ));
1652        let schema = Schema::new(vec![
1653            Field::new("a", DataType::Int32, false),
1654            Field::new("b", DataType::Int32, true),
1655            Field::new_struct(
1656                "c",
1657                vec![struct_field_d.clone(), struct_field_e.clone()],
1658                false,
1659            ),
1660        ]);
1661
1662        // create some data
1663        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1664        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1665        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1666        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1667
1668        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1669
1670        // Construct a buffer for value offsets, for the nested array:
1671        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1672        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1673
1674        // Construct a list array from the above two
1675        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1676            .len(5)
1677            .add_buffer(g_value_offsets.clone())
1678            .add_child_data(g_value.to_data())
1679            .build()
1680            .unwrap();
1681        let g = ListArray::from(g_list_data);
1682        // The difference between g and h is that h has a null bitmap
1683        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1684            .len(5)
1685            .add_buffer(g_value_offsets)
1686            .add_child_data(g_value.to_data())
1687            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1688            .build()
1689            .unwrap();
1690        let h = ListArray::from(h_list_data);
1691
1692        let e = StructArray::from(vec![
1693            (struct_field_f, Arc::new(f) as ArrayRef),
1694            (struct_field_g, Arc::new(g) as ArrayRef),
1695            (struct_field_h, Arc::new(h) as ArrayRef),
1696        ]);
1697
1698        let c = StructArray::from(vec![
1699            (struct_field_d, Arc::new(d) as ArrayRef),
1700            (struct_field_e, Arc::new(e) as ArrayRef),
1701        ]);
1702
1703        // build a record batch
1704        let batch = RecordBatch::try_new(
1705            Arc::new(schema),
1706            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1707        )
1708        .unwrap();
1709
1710        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1711        roundtrip(batch, Some(SMALL_SIZE / 3));
1712    }
1713
1714    #[test]
1715    fn arrow_writer_complex_mixed() {
1716        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1717        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1718
1719        // define schema
1720        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1721        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1722        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1723        let schema = Schema::new(vec![Field::new(
1724            "some_nested_object",
1725            DataType::Struct(Fields::from(vec![
1726                offset_field.clone(),
1727                partition_field.clone(),
1728                topic_field.clone(),
1729            ])),
1730            false,
1731        )]);
1732
1733        // create some data
1734        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1735        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1736        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1737
1738        let some_nested_object = StructArray::from(vec![
1739            (offset_field, Arc::new(offset) as ArrayRef),
1740            (partition_field, Arc::new(partition) as ArrayRef),
1741            (topic_field, Arc::new(topic) as ArrayRef),
1742        ]);
1743
1744        // build a record batch
1745        let batch =
1746            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1747
1748        roundtrip(batch, Some(SMALL_SIZE / 2));
1749    }
1750
1751    #[test]
1752    fn arrow_writer_map() {
1753        // Note: we are using the JSON Arrow reader for brevity
1754        let json_content = r#"
1755        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1756        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1757        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1758        "#;
1759        let entries_struct_type = DataType::Struct(Fields::from(vec![
1760            Field::new("key", DataType::Utf8, false),
1761            Field::new("value", DataType::Utf8, true),
1762        ]));
1763        let stocks_field = Field::new(
1764            "stocks",
1765            DataType::Map(
1766                Arc::new(Field::new("entries", entries_struct_type, false)),
1767                false,
1768            ),
1769            true,
1770        );
1771        let schema = Arc::new(Schema::new(vec![stocks_field]));
1772        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1773        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1774
1775        let batch = reader.next().unwrap().unwrap();
1776        roundtrip(batch, None);
1777    }
1778
1779    #[test]
1780    fn arrow_writer_2_level_struct() {
1781        // tests writing <struct<struct<primitive>>
1782        let field_c = Field::new("c", DataType::Int32, true);
1783        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1784        let type_a = DataType::Struct(vec![field_b.clone()].into());
1785        let field_a = Field::new("a", type_a, true);
1786        let schema = Schema::new(vec![field_a.clone()]);
1787
1788        // create data
1789        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1790        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1791            .len(6)
1792            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1793            .add_child_data(c.into_data())
1794            .build()
1795            .unwrap();
1796        let b = StructArray::from(b_data);
1797        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1798            .len(6)
1799            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1800            .add_child_data(b.into_data())
1801            .build()
1802            .unwrap();
1803        let a = StructArray::from(a_data);
1804
1805        assert_eq!(a.null_count(), 1);
1806        assert_eq!(a.column(0).null_count(), 2);
1807
1808        // build a racord batch
1809        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1810
1811        roundtrip(batch, Some(SMALL_SIZE / 2));
1812    }
1813
1814    #[test]
1815    fn arrow_writer_2_level_struct_non_null() {
1816        // tests writing <struct<struct<primitive>>
1817        let field_c = Field::new("c", DataType::Int32, false);
1818        let type_b = DataType::Struct(vec![field_c].into());
1819        let field_b = Field::new("b", type_b.clone(), false);
1820        let type_a = DataType::Struct(vec![field_b].into());
1821        let field_a = Field::new("a", type_a.clone(), false);
1822        let schema = Schema::new(vec![field_a]);
1823
1824        // create data
1825        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1826        let b_data = ArrayDataBuilder::new(type_b)
1827            .len(6)
1828            .add_child_data(c.into_data())
1829            .build()
1830            .unwrap();
1831        let b = StructArray::from(b_data);
1832        let a_data = ArrayDataBuilder::new(type_a)
1833            .len(6)
1834            .add_child_data(b.into_data())
1835            .build()
1836            .unwrap();
1837        let a = StructArray::from(a_data);
1838
1839        assert_eq!(a.null_count(), 0);
1840        assert_eq!(a.column(0).null_count(), 0);
1841
1842        // build a racord batch
1843        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1844
1845        roundtrip(batch, Some(SMALL_SIZE / 2));
1846    }
1847
1848    #[test]
1849    fn arrow_writer_2_level_struct_mixed_null() {
1850        // tests writing <struct<struct<primitive>>
1851        let field_c = Field::new("c", DataType::Int32, false);
1852        let type_b = DataType::Struct(vec![field_c].into());
1853        let field_b = Field::new("b", type_b.clone(), true);
1854        let type_a = DataType::Struct(vec![field_b].into());
1855        let field_a = Field::new("a", type_a.clone(), false);
1856        let schema = Schema::new(vec![field_a]);
1857
1858        // create data
1859        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1860        let b_data = ArrayDataBuilder::new(type_b)
1861            .len(6)
1862            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1863            .add_child_data(c.into_data())
1864            .build()
1865            .unwrap();
1866        let b = StructArray::from(b_data);
1867        // a intentionally has no null buffer, to test that this is handled correctly
1868        let a_data = ArrayDataBuilder::new(type_a)
1869            .len(6)
1870            .add_child_data(b.into_data())
1871            .build()
1872            .unwrap();
1873        let a = StructArray::from(a_data);
1874
1875        assert_eq!(a.null_count(), 0);
1876        assert_eq!(a.column(0).null_count(), 2);
1877
1878        // build a racord batch
1879        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1880
1881        roundtrip(batch, Some(SMALL_SIZE / 2));
1882    }
1883
1884    #[test]
1885    fn arrow_writer_2_level_struct_mixed_null_2() {
1886        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
1887        let field_c = Field::new("c", DataType::Int32, false);
1888        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1889        let field_e = Field::new(
1890            "e",
1891            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1892            false,
1893        );
1894
1895        let field_b = Field::new(
1896            "b",
1897            DataType::Struct(vec![field_c, field_d, field_e].into()),
1898            false,
1899        );
1900        let type_a = DataType::Struct(vec![field_b.clone()].into());
1901        let field_a = Field::new("a", type_a, true);
1902        let schema = Schema::new(vec![field_a.clone()]);
1903
1904        // create data
1905        let c = Int32Array::from_iter_values(0..6);
1906        let d = FixedSizeBinaryArray::try_from_iter(
1907            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1908        )
1909        .expect("four byte values");
1910        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1911        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1912            .len(6)
1913            .add_child_data(c.into_data())
1914            .add_child_data(d.into_data())
1915            .add_child_data(e.into_data())
1916            .build()
1917            .unwrap();
1918        let b = StructArray::from(b_data);
1919        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1920            .len(6)
1921            .null_bit_buffer(Some(Buffer::from([0b00100101])))
1922            .add_child_data(b.into_data())
1923            .build()
1924            .unwrap();
1925        let a = StructArray::from(a_data);
1926
1927        assert_eq!(a.null_count(), 3);
1928        assert_eq!(a.column(0).null_count(), 0);
1929
1930        // build a record batch
1931        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1932
1933        roundtrip(batch, Some(SMALL_SIZE / 2));
1934    }
1935
1936    #[test]
1937    fn test_fixed_size_binary_in_dict() {
1938        fn test_fixed_size_binary_in_dict_inner<K>()
1939        where
1940            K: ArrowDictionaryKeyType,
1941            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
1942            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
1943        {
1944            let field = Field::new(
1945                "a",
1946                DataType::Dictionary(
1947                    Box::new(K::DATA_TYPE),
1948                    Box::new(DataType::FixedSizeBinary(4)),
1949                ),
1950                false,
1951            );
1952            let schema = Schema::new(vec![field]);
1953
1954            let keys: Vec<K::Native> = vec![
1955                K::Native::try_from(0u8).unwrap(),
1956                K::Native::try_from(0u8).unwrap(),
1957                K::Native::try_from(1u8).unwrap(),
1958            ];
1959            let keys = PrimitiveArray::<K>::from_iter_values(keys);
1960            let values = FixedSizeBinaryArray::try_from_iter(
1961                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
1962            )
1963            .unwrap();
1964
1965            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
1966            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
1967            roundtrip(batch, None);
1968        }
1969
1970        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
1971        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1972        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
1973        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1974        test_fixed_size_binary_in_dict_inner::<Int8Type>();
1975        test_fixed_size_binary_in_dict_inner::<Int16Type>();
1976        test_fixed_size_binary_in_dict_inner::<Int32Type>();
1977        test_fixed_size_binary_in_dict_inner::<Int64Type>();
1978    }
1979
1980    #[test]
1981    fn test_empty_dict() {
1982        let struct_fields = Fields::from(vec![Field::new(
1983            "dict",
1984            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1985            false,
1986        )]);
1987
1988        let schema = Schema::new(vec![Field::new_struct(
1989            "struct",
1990            struct_fields.clone(),
1991            true,
1992        )]);
1993        let dictionary = Arc::new(DictionaryArray::new(
1994            Int32Array::new_null(5),
1995            Arc::new(StringArray::new_null(0)),
1996        ));
1997
1998        let s = StructArray::new(
1999            struct_fields,
2000            vec![dictionary],
2001            Some(NullBuffer::new_null(5)),
2002        );
2003
2004        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2005        roundtrip(batch, None);
2006    }
2007    #[test]
2008    fn arrow_writer_page_size() {
2009        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2010
2011        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2012
2013        // Generate an array of 10 unique 10 character string
2014        for i in 0..10 {
2015            let value = i
2016                .to_string()
2017                .repeat(10)
2018                .chars()
2019                .take(10)
2020                .collect::<String>();
2021
2022            builder.append_value(value);
2023        }
2024
2025        let array = Arc::new(builder.finish());
2026
2027        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2028
2029        let file = tempfile::tempfile().unwrap();
2030
2031        // Set everything very low so we fallback to PLAIN encoding after the first row
2032        let props = WriterProperties::builder()
2033            .set_data_page_size_limit(1)
2034            .set_dictionary_page_size_limit(1)
2035            .set_write_batch_size(1)
2036            .build();
2037
2038        let mut writer =
2039            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2040                .expect("Unable to write file");
2041        writer.write(&batch).unwrap();
2042        writer.close().unwrap();
2043
2044        let options = ReadOptionsBuilder::new().with_page_index().build();
2045        let reader =
2046            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2047
2048        let column = reader.metadata().row_group(0).columns();
2049
2050        assert_eq!(column.len(), 1);
2051
2052        // We should write one row before falling back to PLAIN encoding so there should still be a
2053        // dictionary page.
2054        assert!(
2055            column[0].dictionary_page_offset().is_some(),
2056            "Expected a dictionary page"
2057        );
2058
2059        assert!(reader.metadata().offset_index().is_some());
2060        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2061
2062        let page_locations = offset_indexes[0].page_locations.clone();
2063
2064        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2065        // so we expect one dictionary encoded page and then a page per row thereafter.
2066        assert_eq!(
2067            page_locations.len(),
2068            10,
2069            "Expected 10 pages but got {page_locations:#?}"
2070        );
2071    }
2072
2073    #[test]
2074    fn arrow_writer_float_nans() {
2075        let f16_field = Field::new("a", DataType::Float16, false);
2076        let f32_field = Field::new("b", DataType::Float32, false);
2077        let f64_field = Field::new("c", DataType::Float64, false);
2078        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2079
2080        let f16_values = (0..MEDIUM_SIZE)
2081            .map(|i| {
2082                Some(if i % 2 == 0 {
2083                    f16::NAN
2084                } else {
2085                    f16::from_f32(i as f32)
2086                })
2087            })
2088            .collect::<Float16Array>();
2089
2090        let f32_values = (0..MEDIUM_SIZE)
2091            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2092            .collect::<Float32Array>();
2093
2094        let f64_values = (0..MEDIUM_SIZE)
2095            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2096            .collect::<Float64Array>();
2097
2098        let batch = RecordBatch::try_new(
2099            Arc::new(schema),
2100            vec![
2101                Arc::new(f16_values),
2102                Arc::new(f32_values),
2103                Arc::new(f64_values),
2104            ],
2105        )
2106        .unwrap();
2107
2108        roundtrip(batch, None);
2109    }
2110
2111    const SMALL_SIZE: usize = 7;
2112    const MEDIUM_SIZE: usize = 63;
2113
2114    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2115        let mut files = vec![];
2116        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2117            let mut props = WriterProperties::builder().set_writer_version(version);
2118
2119            if let Some(size) = max_row_group_size {
2120                props = props.set_max_row_group_size(size)
2121            }
2122
2123            let props = props.build();
2124            files.push(roundtrip_opts(&expected_batch, props))
2125        }
2126        files
2127    }
2128
2129    fn roundtrip_opts_with_array_validation<F>(
2130        expected_batch: &RecordBatch,
2131        props: WriterProperties,
2132        validate: F,
2133    ) -> File
2134    where
2135        F: Fn(&ArrayData, &ArrayData),
2136    {
2137        let file = tempfile::tempfile().unwrap();
2138
2139        let mut writer = ArrowWriter::try_new(
2140            file.try_clone().unwrap(),
2141            expected_batch.schema(),
2142            Some(props),
2143        )
2144        .expect("Unable to write file");
2145        writer.write(expected_batch).unwrap();
2146        writer.close().unwrap();
2147
2148        let mut record_batch_reader =
2149            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2150
2151        let actual_batch = record_batch_reader
2152            .next()
2153            .expect("No batch found")
2154            .expect("Unable to get batch");
2155
2156        assert_eq!(expected_batch.schema(), actual_batch.schema());
2157        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2158        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2159        for i in 0..expected_batch.num_columns() {
2160            let expected_data = expected_batch.column(i).to_data();
2161            let actual_data = actual_batch.column(i).to_data();
2162            validate(&expected_data, &actual_data);
2163        }
2164
2165        file
2166    }
2167
2168    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2169        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2170            a.validate_full().expect("valid expected data");
2171            b.validate_full().expect("valid actual data");
2172            assert_eq!(a, b)
2173        })
2174    }
2175
2176    struct RoundTripOptions {
2177        values: ArrayRef,
2178        schema: SchemaRef,
2179        bloom_filter: bool,
2180        bloom_filter_position: BloomFilterPosition,
2181    }
2182
2183    impl RoundTripOptions {
2184        fn new(values: ArrayRef, nullable: bool) -> Self {
2185            let data_type = values.data_type().clone();
2186            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2187            Self {
2188                values,
2189                schema: Arc::new(schema),
2190                bloom_filter: false,
2191                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2192            }
2193        }
2194    }
2195
2196    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2197        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2198    }
2199
2200    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2201        let mut options = RoundTripOptions::new(values, false);
2202        options.schema = schema;
2203        one_column_roundtrip_with_options(options)
2204    }
2205
2206    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2207        let RoundTripOptions {
2208            values,
2209            schema,
2210            bloom_filter,
2211            bloom_filter_position,
2212        } = options;
2213
2214        let encodings = match values.data_type() {
2215            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2216                vec![
2217                    Encoding::PLAIN,
2218                    Encoding::DELTA_BYTE_ARRAY,
2219                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2220                ]
2221            }
2222            DataType::Int64
2223            | DataType::Int32
2224            | DataType::Int16
2225            | DataType::Int8
2226            | DataType::UInt64
2227            | DataType::UInt32
2228            | DataType::UInt16
2229            | DataType::UInt8 => vec![
2230                Encoding::PLAIN,
2231                Encoding::DELTA_BINARY_PACKED,
2232                Encoding::BYTE_STREAM_SPLIT,
2233            ],
2234            DataType::Float32 | DataType::Float64 => {
2235                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2236            }
2237            _ => vec![Encoding::PLAIN],
2238        };
2239
2240        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2241
2242        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2243
2244        let mut files = vec![];
2245        for dictionary_size in [0, 1, 1024] {
2246            for encoding in &encodings {
2247                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2248                    for row_group_size in row_group_sizes {
2249                        let props = WriterProperties::builder()
2250                            .set_writer_version(version)
2251                            .set_max_row_group_size(row_group_size)
2252                            .set_dictionary_enabled(dictionary_size != 0)
2253                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2254                            .set_encoding(*encoding)
2255                            .set_bloom_filter_enabled(bloom_filter)
2256                            .set_bloom_filter_position(bloom_filter_position)
2257                            .build();
2258
2259                        files.push(roundtrip_opts(&expected_batch, props))
2260                    }
2261                }
2262            }
2263        }
2264        files
2265    }
2266
2267    fn values_required<A, I>(iter: I) -> Vec<File>
2268    where
2269        A: From<Vec<I::Item>> + Array + 'static,
2270        I: IntoIterator,
2271    {
2272        let raw_values: Vec<_> = iter.into_iter().collect();
2273        let values = Arc::new(A::from(raw_values));
2274        one_column_roundtrip(values, false)
2275    }
2276
2277    fn values_optional<A, I>(iter: I) -> Vec<File>
2278    where
2279        A: From<Vec<Option<I::Item>>> + Array + 'static,
2280        I: IntoIterator,
2281    {
2282        let optional_raw_values: Vec<_> = iter
2283            .into_iter()
2284            .enumerate()
2285            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2286            .collect();
2287        let optional_values = Arc::new(A::from(optional_raw_values));
2288        one_column_roundtrip(optional_values, true)
2289    }
2290
2291    fn required_and_optional<A, I>(iter: I)
2292    where
2293        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2294        I: IntoIterator + Clone,
2295    {
2296        values_required::<A, I>(iter.clone());
2297        values_optional::<A, I>(iter);
2298    }
2299
2300    fn check_bloom_filter<T: AsBytes>(
2301        files: Vec<File>,
2302        file_column: String,
2303        positive_values: Vec<T>,
2304        negative_values: Vec<T>,
2305    ) {
2306        files.into_iter().take(1).for_each(|file| {
2307            let file_reader = SerializedFileReader::new_with_options(
2308                file,
2309                ReadOptionsBuilder::new()
2310                    .with_reader_properties(
2311                        ReaderProperties::builder()
2312                            .set_read_bloom_filter(true)
2313                            .build(),
2314                    )
2315                    .build(),
2316            )
2317            .expect("Unable to open file as Parquet");
2318            let metadata = file_reader.metadata();
2319
2320            // Gets bloom filters from all row groups.
2321            let mut bloom_filters: Vec<_> = vec![];
2322            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2323                if let Some((column_index, _)) = row_group
2324                    .columns()
2325                    .iter()
2326                    .enumerate()
2327                    .find(|(_, column)| column.column_path().string() == file_column)
2328                {
2329                    let row_group_reader = file_reader
2330                        .get_row_group(ri)
2331                        .expect("Unable to read row group");
2332                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2333                        bloom_filters.push(sbbf.clone());
2334                    } else {
2335                        panic!("No bloom filter for column named {file_column} found");
2336                    }
2337                } else {
2338                    panic!("No column named {file_column} found");
2339                }
2340            }
2341
2342            positive_values.iter().for_each(|value| {
2343                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2344                assert!(
2345                    found.is_some(),
2346                    "{}",
2347                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2348                );
2349            });
2350
2351            negative_values.iter().for_each(|value| {
2352                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2353                assert!(
2354                    found.is_none(),
2355                    "{}",
2356                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2357                );
2358            });
2359        });
2360    }
2361
2362    #[test]
2363    fn all_null_primitive_single_column() {
2364        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2365        one_column_roundtrip(values, true);
2366    }
2367    #[test]
2368    fn null_single_column() {
2369        let values = Arc::new(NullArray::new(SMALL_SIZE));
2370        one_column_roundtrip(values, true);
2371        // null arrays are always nullable, a test with non-nullable nulls fails
2372    }
2373
2374    #[test]
2375    fn bool_single_column() {
2376        required_and_optional::<BooleanArray, _>(
2377            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2378        );
2379    }
2380
2381    #[test]
2382    fn bool_large_single_column() {
2383        let values = Arc::new(
2384            [None, Some(true), Some(false)]
2385                .iter()
2386                .cycle()
2387                .copied()
2388                .take(200_000)
2389                .collect::<BooleanArray>(),
2390        );
2391        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2392        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2393        let file = tempfile::tempfile().unwrap();
2394
2395        let mut writer =
2396            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2397                .expect("Unable to write file");
2398        writer.write(&expected_batch).unwrap();
2399        writer.close().unwrap();
2400    }
2401
2402    #[test]
2403    fn check_page_offset_index_with_nan() {
2404        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2405        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2406        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2407
2408        let mut out = Vec::with_capacity(1024);
2409        let mut writer =
2410            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2411        writer.write(&batch).unwrap();
2412        let file_meta_data = writer.close().unwrap();
2413        for row_group in file_meta_data.row_groups {
2414            for column in row_group.columns {
2415                assert!(column.offset_index_offset.is_some());
2416                assert!(column.offset_index_length.is_some());
2417                assert!(column.column_index_offset.is_none());
2418                assert!(column.column_index_length.is_none());
2419            }
2420        }
2421    }
2422
2423    #[test]
2424    fn i8_single_column() {
2425        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2426    }
2427
2428    #[test]
2429    fn i16_single_column() {
2430        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2431    }
2432
2433    #[test]
2434    fn i32_single_column() {
2435        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2436    }
2437
2438    #[test]
2439    fn i64_single_column() {
2440        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2441    }
2442
2443    #[test]
2444    fn u8_single_column() {
2445        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2446    }
2447
2448    #[test]
2449    fn u16_single_column() {
2450        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2451    }
2452
2453    #[test]
2454    fn u32_single_column() {
2455        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2456    }
2457
2458    #[test]
2459    fn u64_single_column() {
2460        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2461    }
2462
2463    #[test]
2464    fn f32_single_column() {
2465        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2466    }
2467
2468    #[test]
2469    fn f64_single_column() {
2470        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2471    }
2472
2473    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2474    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2475    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2476
2477    #[test]
2478    fn timestamp_second_single_column() {
2479        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2480        let values = Arc::new(TimestampSecondArray::from(raw_values));
2481
2482        one_column_roundtrip(values, false);
2483    }
2484
2485    #[test]
2486    fn timestamp_millisecond_single_column() {
2487        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2488        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2489
2490        one_column_roundtrip(values, false);
2491    }
2492
2493    #[test]
2494    fn timestamp_microsecond_single_column() {
2495        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2496        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2497
2498        one_column_roundtrip(values, false);
2499    }
2500
2501    #[test]
2502    fn timestamp_nanosecond_single_column() {
2503        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2504        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2505
2506        one_column_roundtrip(values, false);
2507    }
2508
2509    #[test]
2510    fn date32_single_column() {
2511        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2512    }
2513
2514    #[test]
2515    fn date64_single_column() {
2516        // Date64 must be a multiple of 86400000, see ARROW-10925
2517        required_and_optional::<Date64Array, _>(
2518            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2519        );
2520    }
2521
2522    #[test]
2523    fn time32_second_single_column() {
2524        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2525    }
2526
2527    #[test]
2528    fn time32_millisecond_single_column() {
2529        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2530    }
2531
2532    #[test]
2533    fn time64_microsecond_single_column() {
2534        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2535    }
2536
2537    #[test]
2538    fn time64_nanosecond_single_column() {
2539        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2540    }
2541
2542    #[test]
2543    fn duration_second_single_column() {
2544        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2545    }
2546
2547    #[test]
2548    fn duration_millisecond_single_column() {
2549        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2550    }
2551
2552    #[test]
2553    fn duration_microsecond_single_column() {
2554        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2555    }
2556
2557    #[test]
2558    fn duration_nanosecond_single_column() {
2559        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2560    }
2561
2562    #[test]
2563    fn interval_year_month_single_column() {
2564        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2565    }
2566
2567    #[test]
2568    fn interval_day_time_single_column() {
2569        required_and_optional::<IntervalDayTimeArray, _>(vec![
2570            IntervalDayTime::new(0, 1),
2571            IntervalDayTime::new(0, 3),
2572            IntervalDayTime::new(3, -2),
2573            IntervalDayTime::new(-200, 4),
2574        ]);
2575    }
2576
2577    #[test]
2578    #[should_panic(
2579        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2580    )]
2581    fn interval_month_day_nano_single_column() {
2582        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2583            IntervalMonthDayNano::new(0, 1, 5),
2584            IntervalMonthDayNano::new(0, 3, 2),
2585            IntervalMonthDayNano::new(3, -2, -5),
2586            IntervalMonthDayNano::new(-200, 4, -1),
2587        ]);
2588    }
2589
2590    #[test]
2591    fn binary_single_column() {
2592        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2593        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2594        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2595
2596        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2597        values_required::<BinaryArray, _>(many_vecs_iter);
2598    }
2599
2600    #[test]
2601    fn binary_view_single_column() {
2602        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2603        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2604        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2605
2606        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2607        values_required::<BinaryViewArray, _>(many_vecs_iter);
2608    }
2609
2610    #[test]
2611    fn i32_column_bloom_filter_at_end() {
2612        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2613        let mut options = RoundTripOptions::new(array, false);
2614        options.bloom_filter = true;
2615        options.bloom_filter_position = BloomFilterPosition::End;
2616
2617        let files = one_column_roundtrip_with_options(options);
2618        check_bloom_filter(
2619            files,
2620            "col".to_string(),
2621            (0..SMALL_SIZE as i32).collect(),
2622            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2623        );
2624    }
2625
2626    #[test]
2627    fn i32_column_bloom_filter() {
2628        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2629        let mut options = RoundTripOptions::new(array, false);
2630        options.bloom_filter = true;
2631
2632        let files = one_column_roundtrip_with_options(options);
2633        check_bloom_filter(
2634            files,
2635            "col".to_string(),
2636            (0..SMALL_SIZE as i32).collect(),
2637            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2638        );
2639    }
2640
2641    #[test]
2642    fn binary_column_bloom_filter() {
2643        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2644        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2645        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2646
2647        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2648        let mut options = RoundTripOptions::new(array, false);
2649        options.bloom_filter = true;
2650
2651        let files = one_column_roundtrip_with_options(options);
2652        check_bloom_filter(
2653            files,
2654            "col".to_string(),
2655            many_vecs,
2656            vec![vec![(SMALL_SIZE + 1) as u8]],
2657        );
2658    }
2659
2660    #[test]
2661    fn empty_string_null_column_bloom_filter() {
2662        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2663        let raw_strs = raw_values.iter().map(|s| s.as_str());
2664
2665        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2666        let mut options = RoundTripOptions::new(array, false);
2667        options.bloom_filter = true;
2668
2669        let files = one_column_roundtrip_with_options(options);
2670
2671        let optional_raw_values: Vec<_> = raw_values
2672            .iter()
2673            .enumerate()
2674            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2675            .collect();
2676        // For null slots, empty string should not be in bloom filter.
2677        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2678    }
2679
2680    #[test]
2681    fn large_binary_single_column() {
2682        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2683        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2684        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2685
2686        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2687        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2688    }
2689
2690    #[test]
2691    fn fixed_size_binary_single_column() {
2692        let mut builder = FixedSizeBinaryBuilder::new(4);
2693        builder.append_value(b"0123").unwrap();
2694        builder.append_null();
2695        builder.append_value(b"8910").unwrap();
2696        builder.append_value(b"1112").unwrap();
2697        let array = Arc::new(builder.finish());
2698
2699        one_column_roundtrip(array, true);
2700    }
2701
2702    #[test]
2703    fn string_single_column() {
2704        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2705        let raw_strs = raw_values.iter().map(|s| s.as_str());
2706
2707        required_and_optional::<StringArray, _>(raw_strs);
2708    }
2709
2710    #[test]
2711    fn large_string_single_column() {
2712        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2713        let raw_strs = raw_values.iter().map(|s| s.as_str());
2714
2715        required_and_optional::<LargeStringArray, _>(raw_strs);
2716    }
2717
2718    #[test]
2719    fn string_view_single_column() {
2720        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2721        let raw_strs = raw_values.iter().map(|s| s.as_str());
2722
2723        required_and_optional::<StringViewArray, _>(raw_strs);
2724    }
2725
2726    #[test]
2727    fn null_list_single_column() {
2728        let null_field = Field::new_list_field(DataType::Null, true);
2729        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2730
2731        let schema = Schema::new(vec![list_field]);
2732
2733        // Build [[], null, [null, null]]
2734        let a_values = NullArray::new(2);
2735        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2736        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2737            DataType::Null,
2738            true,
2739        ))))
2740        .len(3)
2741        .add_buffer(a_value_offsets)
2742        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2743        .add_child_data(a_values.into_data())
2744        .build()
2745        .unwrap();
2746
2747        let a = ListArray::from(a_list_data);
2748
2749        assert!(a.is_valid(0));
2750        assert!(!a.is_valid(1));
2751        assert!(a.is_valid(2));
2752
2753        assert_eq!(a.value(0).len(), 0);
2754        assert_eq!(a.value(2).len(), 2);
2755        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2756
2757        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2758        roundtrip(batch, None);
2759    }
2760
2761    #[test]
2762    fn list_single_column() {
2763        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2764        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2765        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2766            DataType::Int32,
2767            false,
2768        ))))
2769        .len(5)
2770        .add_buffer(a_value_offsets)
2771        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2772        .add_child_data(a_values.into_data())
2773        .build()
2774        .unwrap();
2775
2776        assert_eq!(a_list_data.null_count(), 1);
2777
2778        let a = ListArray::from(a_list_data);
2779        let values = Arc::new(a);
2780
2781        one_column_roundtrip(values, true);
2782    }
2783
2784    #[test]
2785    fn large_list_single_column() {
2786        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2787        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2788        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2789            "large_item",
2790            DataType::Int32,
2791            true,
2792        ))))
2793        .len(5)
2794        .add_buffer(a_value_offsets)
2795        .add_child_data(a_values.into_data())
2796        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2797        .build()
2798        .unwrap();
2799
2800        // I think this setup is incorrect because this should pass
2801        assert_eq!(a_list_data.null_count(), 1);
2802
2803        let a = LargeListArray::from(a_list_data);
2804        let values = Arc::new(a);
2805
2806        one_column_roundtrip(values, true);
2807    }
2808
2809    #[test]
2810    fn list_nested_nulls() {
2811        use arrow::datatypes::Int32Type;
2812        let data = vec![
2813            Some(vec![Some(1)]),
2814            Some(vec![Some(2), Some(3)]),
2815            None,
2816            Some(vec![Some(4), Some(5), None]),
2817            Some(vec![None]),
2818            Some(vec![Some(6), Some(7)]),
2819        ];
2820
2821        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2822        one_column_roundtrip(Arc::new(list), true);
2823
2824        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2825        one_column_roundtrip(Arc::new(list), true);
2826    }
2827
2828    #[test]
2829    fn struct_single_column() {
2830        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2831        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2832        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2833
2834        let values = Arc::new(s);
2835        one_column_roundtrip(values, false);
2836    }
2837
2838    #[test]
2839    fn list_and_map_coerced_names() {
2840        // Create map and list with non-Parquet naming
2841        let list_field =
2842            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2843        let map_field = Field::new_map(
2844            "my_map",
2845            "entries",
2846            Field::new("keys", DataType::Int32, false),
2847            Field::new("values", DataType::Int32, true),
2848            false,
2849            true,
2850        );
2851
2852        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2853        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2854
2855        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2856
2857        // Write data to Parquet but coerce names to match spec
2858        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2859        let file = tempfile::tempfile().unwrap();
2860        let mut writer =
2861            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2862
2863        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2864        writer.write(&batch).unwrap();
2865        let file_metadata = writer.close().unwrap();
2866
2867        // Coerced name of "item" should be "element"
2868        assert_eq!(file_metadata.schema[3].name, "element");
2869        // Coerced name of "entries" should be "key_value"
2870        assert_eq!(file_metadata.schema[5].name, "key_value");
2871        // Coerced name of "keys" should be "key"
2872        assert_eq!(file_metadata.schema[6].name, "key");
2873        // Coerced name of "values" should be "value"
2874        assert_eq!(file_metadata.schema[7].name, "value");
2875
2876        // Double check schema after reading from the file
2877        let reader = SerializedFileReader::new(file).unwrap();
2878        let file_schema = reader.metadata().file_metadata().schema();
2879        let fields = file_schema.get_fields();
2880        let list_field = &fields[0].get_fields()[0];
2881        assert_eq!(list_field.get_fields()[0].name(), "element");
2882        let map_field = &fields[1].get_fields()[0];
2883        assert_eq!(map_field.name(), "key_value");
2884        assert_eq!(map_field.get_fields()[0].name(), "key");
2885        assert_eq!(map_field.get_fields()[1].name(), "value");
2886    }
2887
2888    #[test]
2889    fn fallback_flush_data_page() {
2890        //tests if the Fallback::flush_data_page clears all buffers correctly
2891        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2892        let values = Arc::new(StringArray::from(raw_values));
2893        let encodings = vec![
2894            Encoding::DELTA_BYTE_ARRAY,
2895            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2896        ];
2897        let data_type = values.data_type().clone();
2898        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2899        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2900
2901        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2902        let data_page_size_limit: usize = 32;
2903        let write_batch_size: usize = 16;
2904
2905        for encoding in &encodings {
2906            for row_group_size in row_group_sizes {
2907                let props = WriterProperties::builder()
2908                    .set_writer_version(WriterVersion::PARQUET_2_0)
2909                    .set_max_row_group_size(row_group_size)
2910                    .set_dictionary_enabled(false)
2911                    .set_encoding(*encoding)
2912                    .set_data_page_size_limit(data_page_size_limit)
2913                    .set_write_batch_size(write_batch_size)
2914                    .build();
2915
2916                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2917                    let string_array_a = StringArray::from(a.clone());
2918                    let string_array_b = StringArray::from(b.clone());
2919                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2920                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2921                    assert_eq!(
2922                        vec_a, vec_b,
2923                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2924                    );
2925                });
2926            }
2927        }
2928    }
2929
2930    #[test]
2931    fn arrow_writer_string_dictionary() {
2932        // define schema
2933        #[allow(deprecated)]
2934        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2935            "dictionary",
2936            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2937            true,
2938            42,
2939            true,
2940        )]));
2941
2942        // create some data
2943        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2944            .iter()
2945            .copied()
2946            .collect();
2947
2948        // build a record batch
2949        one_column_roundtrip_with_schema(Arc::new(d), schema);
2950    }
2951
2952    #[test]
2953    fn arrow_writer_primitive_dictionary() {
2954        // define schema
2955        #[allow(deprecated)]
2956        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2957            "dictionary",
2958            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2959            true,
2960            42,
2961            true,
2962        )]));
2963
2964        // create some data
2965        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2966        builder.append(12345678).unwrap();
2967        builder.append_null();
2968        builder.append(22345678).unwrap();
2969        builder.append(12345678).unwrap();
2970        let d = builder.finish();
2971
2972        one_column_roundtrip_with_schema(Arc::new(d), schema);
2973    }
2974
2975    #[test]
2976    fn arrow_writer_decimal128_dictionary() {
2977        let integers = vec![12345, 56789, 34567];
2978
2979        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2980
2981        let values = Decimal128Array::from(integers.clone())
2982            .with_precision_and_scale(5, 2)
2983            .unwrap();
2984
2985        let array = DictionaryArray::new(keys, Arc::new(values));
2986        one_column_roundtrip(Arc::new(array.clone()), true);
2987
2988        let values = Decimal128Array::from(integers)
2989            .with_precision_and_scale(12, 2)
2990            .unwrap();
2991
2992        let array = array.with_values(Arc::new(values));
2993        one_column_roundtrip(Arc::new(array), true);
2994    }
2995
2996    #[test]
2997    fn arrow_writer_decimal256_dictionary() {
2998        let integers = vec![
2999            i256::from_i128(12345),
3000            i256::from_i128(56789),
3001            i256::from_i128(34567),
3002        ];
3003
3004        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3005
3006        let values = Decimal256Array::from(integers.clone())
3007            .with_precision_and_scale(5, 2)
3008            .unwrap();
3009
3010        let array = DictionaryArray::new(keys, Arc::new(values));
3011        one_column_roundtrip(Arc::new(array.clone()), true);
3012
3013        let values = Decimal256Array::from(integers)
3014            .with_precision_and_scale(12, 2)
3015            .unwrap();
3016
3017        let array = array.with_values(Arc::new(values));
3018        one_column_roundtrip(Arc::new(array), true);
3019    }
3020
3021    #[test]
3022    fn arrow_writer_string_dictionary_unsigned_index() {
3023        // define schema
3024        #[allow(deprecated)]
3025        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3026            "dictionary",
3027            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3028            true,
3029            42,
3030            true,
3031        )]));
3032
3033        // create some data
3034        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3035            .iter()
3036            .copied()
3037            .collect();
3038
3039        one_column_roundtrip_with_schema(Arc::new(d), schema);
3040    }
3041
3042    #[test]
3043    fn u32_min_max() {
3044        // check values roundtrip through parquet
3045        let src = [
3046            u32::MIN,
3047            u32::MIN + 1,
3048            (i32::MAX as u32) - 1,
3049            i32::MAX as u32,
3050            (i32::MAX as u32) + 1,
3051            u32::MAX - 1,
3052            u32::MAX,
3053        ];
3054        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3055        let files = one_column_roundtrip(values, false);
3056
3057        for file in files {
3058            // check statistics are valid
3059            let reader = SerializedFileReader::new(file).unwrap();
3060            let metadata = reader.metadata();
3061
3062            let mut row_offset = 0;
3063            for row_group in metadata.row_groups() {
3064                assert_eq!(row_group.num_columns(), 1);
3065                let column = row_group.column(0);
3066
3067                let num_values = column.num_values() as usize;
3068                let src_slice = &src[row_offset..row_offset + num_values];
3069                row_offset += column.num_values() as usize;
3070
3071                let stats = column.statistics().unwrap();
3072                if let Statistics::Int32(stats) = stats {
3073                    assert_eq!(
3074                        *stats.min_opt().unwrap() as u32,
3075                        *src_slice.iter().min().unwrap()
3076                    );
3077                    assert_eq!(
3078                        *stats.max_opt().unwrap() as u32,
3079                        *src_slice.iter().max().unwrap()
3080                    );
3081                } else {
3082                    panic!("Statistics::Int32 missing")
3083                }
3084            }
3085        }
3086    }
3087
3088    #[test]
3089    fn u64_min_max() {
3090        // check values roundtrip through parquet
3091        let src = [
3092            u64::MIN,
3093            u64::MIN + 1,
3094            (i64::MAX as u64) - 1,
3095            i64::MAX as u64,
3096            (i64::MAX as u64) + 1,
3097            u64::MAX - 1,
3098            u64::MAX,
3099        ];
3100        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3101        let files = one_column_roundtrip(values, false);
3102
3103        for file in files {
3104            // check statistics are valid
3105            let reader = SerializedFileReader::new(file).unwrap();
3106            let metadata = reader.metadata();
3107
3108            let mut row_offset = 0;
3109            for row_group in metadata.row_groups() {
3110                assert_eq!(row_group.num_columns(), 1);
3111                let column = row_group.column(0);
3112
3113                let num_values = column.num_values() as usize;
3114                let src_slice = &src[row_offset..row_offset + num_values];
3115                row_offset += column.num_values() as usize;
3116
3117                let stats = column.statistics().unwrap();
3118                if let Statistics::Int64(stats) = stats {
3119                    assert_eq!(
3120                        *stats.min_opt().unwrap() as u64,
3121                        *src_slice.iter().min().unwrap()
3122                    );
3123                    assert_eq!(
3124                        *stats.max_opt().unwrap() as u64,
3125                        *src_slice.iter().max().unwrap()
3126                    );
3127                } else {
3128                    panic!("Statistics::Int64 missing")
3129                }
3130            }
3131        }
3132    }
3133
3134    #[test]
3135    fn statistics_null_counts_only_nulls() {
3136        // check that null-count statistics for "only NULL"-columns are correct
3137        let values = Arc::new(UInt64Array::from(vec![None, None]));
3138        let files = one_column_roundtrip(values, true);
3139
3140        for file in files {
3141            // check statistics are valid
3142            let reader = SerializedFileReader::new(file).unwrap();
3143            let metadata = reader.metadata();
3144            assert_eq!(metadata.num_row_groups(), 1);
3145            let row_group = metadata.row_group(0);
3146            assert_eq!(row_group.num_columns(), 1);
3147            let column = row_group.column(0);
3148            let stats = column.statistics().unwrap();
3149            assert_eq!(stats.null_count_opt(), Some(2));
3150        }
3151    }
3152
3153    #[test]
3154    fn test_list_of_struct_roundtrip() {
3155        // define schema
3156        let int_field = Field::new("a", DataType::Int32, true);
3157        let int_field2 = Field::new("b", DataType::Int32, true);
3158
3159        let int_builder = Int32Builder::with_capacity(10);
3160        let int_builder2 = Int32Builder::with_capacity(10);
3161
3162        let struct_builder = StructBuilder::new(
3163            vec![int_field, int_field2],
3164            vec![Box::new(int_builder), Box::new(int_builder2)],
3165        );
3166        let mut list_builder = ListBuilder::new(struct_builder);
3167
3168        // Construct the following array
3169        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3170
3171        // [{a: 1, b: 2}]
3172        let values = list_builder.values();
3173        values
3174            .field_builder::<Int32Builder>(0)
3175            .unwrap()
3176            .append_value(1);
3177        values
3178            .field_builder::<Int32Builder>(1)
3179            .unwrap()
3180            .append_value(2);
3181        values.append(true);
3182        list_builder.append(true);
3183
3184        // []
3185        list_builder.append(true);
3186
3187        // null
3188        list_builder.append(false);
3189
3190        // [null, null]
3191        let values = list_builder.values();
3192        values
3193            .field_builder::<Int32Builder>(0)
3194            .unwrap()
3195            .append_null();
3196        values
3197            .field_builder::<Int32Builder>(1)
3198            .unwrap()
3199            .append_null();
3200        values.append(false);
3201        values
3202            .field_builder::<Int32Builder>(0)
3203            .unwrap()
3204            .append_null();
3205        values
3206            .field_builder::<Int32Builder>(1)
3207            .unwrap()
3208            .append_null();
3209        values.append(false);
3210        list_builder.append(true);
3211
3212        // [{a: null, b: 3}]
3213        let values = list_builder.values();
3214        values
3215            .field_builder::<Int32Builder>(0)
3216            .unwrap()
3217            .append_null();
3218        values
3219            .field_builder::<Int32Builder>(1)
3220            .unwrap()
3221            .append_value(3);
3222        values.append(true);
3223        list_builder.append(true);
3224
3225        // [{a: 2, b: null}]
3226        let values = list_builder.values();
3227        values
3228            .field_builder::<Int32Builder>(0)
3229            .unwrap()
3230            .append_value(2);
3231        values
3232            .field_builder::<Int32Builder>(1)
3233            .unwrap()
3234            .append_null();
3235        values.append(true);
3236        list_builder.append(true);
3237
3238        let array = Arc::new(list_builder.finish());
3239
3240        one_column_roundtrip(array, true);
3241    }
3242
3243    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3244        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3245    }
3246
3247    #[test]
3248    fn test_aggregates_records() {
3249        let arrays = [
3250            Int32Array::from((0..100).collect::<Vec<_>>()),
3251            Int32Array::from((0..50).collect::<Vec<_>>()),
3252            Int32Array::from((200..500).collect::<Vec<_>>()),
3253        ];
3254
3255        let schema = Arc::new(Schema::new(vec![Field::new(
3256            "int",
3257            ArrowDataType::Int32,
3258            false,
3259        )]));
3260
3261        let file = tempfile::tempfile().unwrap();
3262
3263        let props = WriterProperties::builder()
3264            .set_max_row_group_size(200)
3265            .build();
3266
3267        let mut writer =
3268            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3269
3270        for array in arrays {
3271            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3272            writer.write(&batch).unwrap();
3273        }
3274
3275        writer.close().unwrap();
3276
3277        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3278        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3279
3280        let batches = builder
3281            .with_batch_size(100)
3282            .build()
3283            .unwrap()
3284            .collect::<ArrowResult<Vec<_>>>()
3285            .unwrap();
3286
3287        assert_eq!(batches.len(), 5);
3288        assert!(batches.iter().all(|x| x.num_columns() == 1));
3289
3290        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3291
3292        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3293
3294        let values: Vec<_> = batches
3295            .iter()
3296            .flat_map(|x| {
3297                x.column(0)
3298                    .as_any()
3299                    .downcast_ref::<Int32Array>()
3300                    .unwrap()
3301                    .values()
3302                    .iter()
3303                    .cloned()
3304            })
3305            .collect();
3306
3307        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3308        assert_eq!(&values, &expected_values)
3309    }
3310
3311    #[test]
3312    fn complex_aggregate() {
3313        // Tests aggregating nested data
3314        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3315        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3316        let struct_a = Arc::new(Field::new(
3317            "struct_a",
3318            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3319            true,
3320        ));
3321
3322        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3323        let struct_b = Arc::new(Field::new(
3324            "struct_b",
3325            DataType::Struct(vec![list_a.clone()].into()),
3326            false,
3327        ));
3328
3329        let schema = Arc::new(Schema::new(vec![struct_b]));
3330
3331        // create nested data
3332        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3333        let field_b_array =
3334            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3335
3336        let struct_a_array = StructArray::from(vec![
3337            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3338            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3339        ]);
3340
3341        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3342            .len(5)
3343            .add_buffer(Buffer::from_iter(vec![
3344                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3345            ]))
3346            .null_bit_buffer(Some(Buffer::from_iter(vec![
3347                true, false, true, false, true,
3348            ])))
3349            .child_data(vec![struct_a_array.into_data()])
3350            .build()
3351            .unwrap();
3352
3353        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3354        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3355
3356        let batch1 =
3357            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3358                .unwrap();
3359
3360        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3361        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3362
3363        let struct_a_array = StructArray::from(vec![
3364            (field_a, Arc::new(field_a_array) as ArrayRef),
3365            (field_b, Arc::new(field_b_array) as ArrayRef),
3366        ]);
3367
3368        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3369            .len(2)
3370            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3371            .child_data(vec![struct_a_array.into_data()])
3372            .build()
3373            .unwrap();
3374
3375        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3376        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3377
3378        let batch2 =
3379            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3380                .unwrap();
3381
3382        let batches = &[batch1, batch2];
3383
3384        // Verify data is as expected
3385
3386        let expected = r#"
3387            +-------------------------------------------------------------------------------------------------------+
3388            | struct_b                                                                                              |
3389            +-------------------------------------------------------------------------------------------------------+
3390            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3391            | {list: }                                                                                              |
3392            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3393            | {list: }                                                                                              |
3394            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3395            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3396            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3397            +-------------------------------------------------------------------------------------------------------+
3398        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3399
3400        let actual = pretty_format_batches(batches).unwrap().to_string();
3401        assert_eq!(actual, expected);
3402
3403        // Write data
3404        let file = tempfile::tempfile().unwrap();
3405        let props = WriterProperties::builder()
3406            .set_max_row_group_size(6)
3407            .build();
3408
3409        let mut writer =
3410            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3411
3412        for batch in batches {
3413            writer.write(batch).unwrap();
3414        }
3415        writer.close().unwrap();
3416
3417        // Read Data
3418        // Should have written entire first batch and first row of second to the first row group
3419        // leaving a single row in the second row group
3420
3421        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3422        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3423
3424        let batches = builder
3425            .with_batch_size(2)
3426            .build()
3427            .unwrap()
3428            .collect::<ArrowResult<Vec<_>>>()
3429            .unwrap();
3430
3431        assert_eq!(batches.len(), 4);
3432        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3433        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3434
3435        let actual = pretty_format_batches(&batches).unwrap().to_string();
3436        assert_eq!(actual, expected);
3437    }
3438
3439    #[test]
3440    fn test_arrow_writer_metadata() {
3441        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3442        let file_schema = batch_schema.clone().with_metadata(
3443            vec![("foo".to_string(), "bar".to_string())]
3444                .into_iter()
3445                .collect(),
3446        );
3447
3448        let batch = RecordBatch::try_new(
3449            Arc::new(batch_schema),
3450            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3451        )
3452        .unwrap();
3453
3454        let mut buf = Vec::with_capacity(1024);
3455        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3456        writer.write(&batch).unwrap();
3457        writer.close().unwrap();
3458    }
3459
3460    #[test]
3461    fn test_arrow_writer_nullable() {
3462        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3463        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3464        let file_schema = Arc::new(file_schema);
3465
3466        let batch = RecordBatch::try_new(
3467            Arc::new(batch_schema),
3468            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3469        )
3470        .unwrap();
3471
3472        let mut buf = Vec::with_capacity(1024);
3473        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3474        writer.write(&batch).unwrap();
3475        writer.close().unwrap();
3476
3477        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3478        let back = read.next().unwrap().unwrap();
3479        assert_eq!(back.schema(), file_schema);
3480        assert_ne!(back.schema(), batch.schema());
3481        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3482    }
3483
3484    #[test]
3485    fn in_progress_accounting() {
3486        // define schema
3487        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3488
3489        // create some data
3490        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3491
3492        // build a record batch
3493        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3494
3495        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3496
3497        // starts empty
3498        assert_eq!(writer.in_progress_size(), 0);
3499        assert_eq!(writer.in_progress_rows(), 0);
3500        assert_eq!(writer.memory_size(), 0);
3501        assert_eq!(writer.bytes_written(), 4); // Initial header
3502        writer.write(&batch).unwrap();
3503
3504        // updated on write
3505        let initial_size = writer.in_progress_size();
3506        assert!(initial_size > 0);
3507        assert_eq!(writer.in_progress_rows(), 5);
3508        let initial_memory = writer.memory_size();
3509        assert!(initial_memory > 0);
3510        // memory estimate is larger than estimated encoded size
3511        assert!(
3512            initial_size <= initial_memory,
3513            "{initial_size} <= {initial_memory}"
3514        );
3515
3516        // updated on second write
3517        writer.write(&batch).unwrap();
3518        assert!(writer.in_progress_size() > initial_size);
3519        assert_eq!(writer.in_progress_rows(), 10);
3520        assert!(writer.memory_size() > initial_memory);
3521        assert!(
3522            writer.in_progress_size() <= writer.memory_size(),
3523            "in_progress_size {} <= memory_size {}",
3524            writer.in_progress_size(),
3525            writer.memory_size()
3526        );
3527
3528        // in progress tracking is cleared, but the overall data written is updated
3529        let pre_flush_bytes_written = writer.bytes_written();
3530        writer.flush().unwrap();
3531        assert_eq!(writer.in_progress_size(), 0);
3532        assert_eq!(writer.memory_size(), 0);
3533        assert!(writer.bytes_written() > pre_flush_bytes_written);
3534
3535        writer.close().unwrap();
3536    }
3537
3538    #[test]
3539    fn test_writer_all_null() {
3540        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3541        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3542        let batch = RecordBatch::try_from_iter(vec![
3543            ("a", Arc::new(a) as ArrayRef),
3544            ("b", Arc::new(b) as ArrayRef),
3545        ])
3546        .unwrap();
3547
3548        let mut buf = Vec::with_capacity(1024);
3549        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3550        writer.write(&batch).unwrap();
3551        writer.close().unwrap();
3552
3553        let bytes = Bytes::from(buf);
3554        let options = ReadOptionsBuilder::new().with_page_index().build();
3555        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3556        let index = reader.metadata().offset_index().unwrap();
3557
3558        assert_eq!(index.len(), 1);
3559        assert_eq!(index[0].len(), 2); // 2 columns
3560        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3561        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3562    }
3563
3564    #[test]
3565    fn test_disabled_statistics_with_page() {
3566        let file_schema = Schema::new(vec![
3567            Field::new("a", DataType::Utf8, true),
3568            Field::new("b", DataType::Utf8, true),
3569        ]);
3570        let file_schema = Arc::new(file_schema);
3571
3572        let batch = RecordBatch::try_new(
3573            file_schema.clone(),
3574            vec![
3575                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3576                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3577            ],
3578        )
3579        .unwrap();
3580
3581        let props = WriterProperties::builder()
3582            .set_statistics_enabled(EnabledStatistics::None)
3583            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3584            .build();
3585
3586        let mut buf = Vec::with_capacity(1024);
3587        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3588        writer.write(&batch).unwrap();
3589
3590        let metadata = writer.close().unwrap();
3591        assert_eq!(metadata.row_groups.len(), 1);
3592        let row_group = &metadata.row_groups[0];
3593        assert_eq!(row_group.columns.len(), 2);
3594        // Column "a" has both offset and column index, as requested
3595        assert!(row_group.columns[0].offset_index_offset.is_some());
3596        assert!(row_group.columns[0].column_index_offset.is_some());
3597        // Column "b" should only have offset index
3598        assert!(row_group.columns[1].offset_index_offset.is_some());
3599        assert!(row_group.columns[1].column_index_offset.is_none());
3600
3601        let options = ReadOptionsBuilder::new().with_page_index().build();
3602        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3603
3604        let row_group = reader.get_row_group(0).unwrap();
3605        let a_col = row_group.metadata().column(0);
3606        let b_col = row_group.metadata().column(1);
3607
3608        // Column chunk of column "a" should have chunk level statistics
3609        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3610            let min = byte_array_stats.min_opt().unwrap();
3611            let max = byte_array_stats.max_opt().unwrap();
3612
3613            assert_eq!(min.as_bytes(), b"a");
3614            assert_eq!(max.as_bytes(), b"d");
3615        } else {
3616            panic!("expecting Statistics::ByteArray");
3617        }
3618
3619        // The column chunk for column "b" shouldn't have statistics
3620        assert!(b_col.statistics().is_none());
3621
3622        let offset_index = reader.metadata().offset_index().unwrap();
3623        assert_eq!(offset_index.len(), 1); // 1 row group
3624        assert_eq!(offset_index[0].len(), 2); // 2 columns
3625
3626        let column_index = reader.metadata().column_index().unwrap();
3627        assert_eq!(column_index.len(), 1); // 1 row group
3628        assert_eq!(column_index[0].len(), 2); // 2 columns
3629
3630        let a_idx = &column_index[0][0];
3631        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3632        let b_idx = &column_index[0][1];
3633        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3634    }
3635
3636    #[test]
3637    fn test_disabled_statistics_with_chunk() {
3638        let file_schema = Schema::new(vec![
3639            Field::new("a", DataType::Utf8, true),
3640            Field::new("b", DataType::Utf8, true),
3641        ]);
3642        let file_schema = Arc::new(file_schema);
3643
3644        let batch = RecordBatch::try_new(
3645            file_schema.clone(),
3646            vec![
3647                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3648                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3649            ],
3650        )
3651        .unwrap();
3652
3653        let props = WriterProperties::builder()
3654            .set_statistics_enabled(EnabledStatistics::None)
3655            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3656            .build();
3657
3658        let mut buf = Vec::with_capacity(1024);
3659        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3660        writer.write(&batch).unwrap();
3661
3662        let metadata = writer.close().unwrap();
3663        assert_eq!(metadata.row_groups.len(), 1);
3664        let row_group = &metadata.row_groups[0];
3665        assert_eq!(row_group.columns.len(), 2);
3666        // Column "a" should only have offset index
3667        assert!(row_group.columns[0].offset_index_offset.is_some());
3668        assert!(row_group.columns[0].column_index_offset.is_none());
3669        // Column "b" should only have offset index
3670        assert!(row_group.columns[1].offset_index_offset.is_some());
3671        assert!(row_group.columns[1].column_index_offset.is_none());
3672
3673        let options = ReadOptionsBuilder::new().with_page_index().build();
3674        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3675
3676        let row_group = reader.get_row_group(0).unwrap();
3677        let a_col = row_group.metadata().column(0);
3678        let b_col = row_group.metadata().column(1);
3679
3680        // Column chunk of column "a" should have chunk level statistics
3681        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3682            let min = byte_array_stats.min_opt().unwrap();
3683            let max = byte_array_stats.max_opt().unwrap();
3684
3685            assert_eq!(min.as_bytes(), b"a");
3686            assert_eq!(max.as_bytes(), b"d");
3687        } else {
3688            panic!("expecting Statistics::ByteArray");
3689        }
3690
3691        // The column chunk for column "b"  shouldn't have statistics
3692        assert!(b_col.statistics().is_none());
3693
3694        let column_index = reader.metadata().column_index().unwrap();
3695        assert_eq!(column_index.len(), 1); // 1 row group
3696        assert_eq!(column_index[0].len(), 2); // 2 columns
3697
3698        let a_idx = &column_index[0][0];
3699        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3700        let b_idx = &column_index[0][1];
3701        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3702    }
3703
3704    #[test]
3705    fn test_arrow_writer_skip_metadata() {
3706        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3707        let file_schema = Arc::new(batch_schema.clone());
3708
3709        let batch = RecordBatch::try_new(
3710            Arc::new(batch_schema),
3711            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3712        )
3713        .unwrap();
3714        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3715
3716        let mut buf = Vec::with_capacity(1024);
3717        let mut writer =
3718            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3719        writer.write(&batch).unwrap();
3720        writer.close().unwrap();
3721
3722        let bytes = Bytes::from(buf);
3723        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3724        assert_eq!(file_schema, *reader_builder.schema());
3725        if let Some(key_value_metadata) = reader_builder
3726            .metadata()
3727            .file_metadata()
3728            .key_value_metadata()
3729        {
3730            assert!(!key_value_metadata
3731                .iter()
3732                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3733        }
3734    }
3735
3736    #[test]
3737    fn mismatched_schemas() {
3738        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3739        let file_schema = Arc::new(Schema::new(vec![Field::new(
3740            "temperature",
3741            DataType::Float64,
3742            false,
3743        )]));
3744
3745        let batch = RecordBatch::try_new(
3746            Arc::new(batch_schema),
3747            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3748        )
3749        .unwrap();
3750
3751        let mut buf = Vec::with_capacity(1024);
3752        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3753
3754        let err = writer.write(&batch).unwrap_err().to_string();
3755        assert_eq!(
3756            err,
3757            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3758        );
3759    }
3760
3761    #[test]
3762    // https://github.com/apache/arrow-rs/issues/6988
3763    fn test_roundtrip_empty_schema() {
3764        // create empty record batch with empty schema
3765        let empty_batch = RecordBatch::try_new_with_options(
3766            Arc::new(Schema::empty()),
3767            vec![],
3768            &RecordBatchOptions::default().with_row_count(Some(0)),
3769        )
3770        .unwrap();
3771
3772        // write to parquet
3773        let mut parquet_bytes: Vec<u8> = Vec::new();
3774        let mut writer =
3775            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3776        writer.write(&empty_batch).unwrap();
3777        writer.close().unwrap();
3778
3779        // read from parquet
3780        let bytes = Bytes::from(parquet_bytes);
3781        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3782        assert_eq!(reader.schema(), &empty_batch.schema());
3783        let batches: Vec<_> = reader
3784            .build()
3785            .unwrap()
3786            .collect::<ArrowResult<Vec<_>>>()
3787            .unwrap();
3788        assert_eq!(batches.len(), 0);
3789    }
3790
3791    #[test]
3792    fn test_page_stats_not_written_by_default() {
3793        let string_field = Field::new("a", DataType::Utf8, false);
3794        let schema = Schema::new(vec![string_field]);
3795        let raw_string_values = vec!["Blart Versenwald III"];
3796        let string_values = StringArray::from(raw_string_values.clone());
3797        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
3798
3799        let props = WriterProperties::builder()
3800            .set_statistics_enabled(EnabledStatistics::Page)
3801            .set_dictionary_enabled(false)
3802            .set_encoding(Encoding::PLAIN)
3803            .set_compression(crate::basic::Compression::UNCOMPRESSED)
3804            .build();
3805
3806        let mut file = roundtrip_opts(&batch, props);
3807
3808        // read file and decode page headers
3809        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
3810        let mut buf = vec![];
3811        file.seek(std::io::SeekFrom::Start(0)).unwrap();
3812        let read = file.read_to_end(&mut buf).unwrap();
3813        assert!(read > 0);
3814
3815        // decode first page header
3816        let first_page = &buf[4..];
3817        let mut prot = TCompactSliceInputProtocol::new(first_page);
3818        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3819        let stats = hdr.data_page_header.unwrap().statistics;
3820
3821        assert!(stats.is_none());
3822    }
3823
3824    #[test]
3825    fn test_page_stats_when_enabled() {
3826        let string_field = Field::new("a", DataType::Utf8, false);
3827        let schema = Schema::new(vec![string_field]);
3828        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
3829        let string_values = StringArray::from(raw_string_values.clone());
3830        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
3831
3832        let props = WriterProperties::builder()
3833            .set_statistics_enabled(EnabledStatistics::Page)
3834            .set_dictionary_enabled(false)
3835            .set_encoding(Encoding::PLAIN)
3836            .set_write_page_header_statistics(true)
3837            .set_compression(crate::basic::Compression::UNCOMPRESSED)
3838            .build();
3839
3840        let mut file = roundtrip_opts(&batch, props);
3841
3842        // read file and decode page headers
3843        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
3844        let mut buf = vec![];
3845        file.seek(std::io::SeekFrom::Start(0)).unwrap();
3846        let read = file.read_to_end(&mut buf).unwrap();
3847        assert!(read > 0);
3848
3849        // decode first page header
3850        let first_page = &buf[4..];
3851        let mut prot = TCompactSliceInputProtocol::new(first_page);
3852        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3853        let stats = hdr.data_page_header.unwrap().statistics;
3854
3855        let stats = stats.unwrap();
3856        // check that min/max were actually written to the page
3857        assert!(stats.is_max_value_exact.unwrap());
3858        assert!(stats.is_min_value_exact.unwrap());
3859        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
3860        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
3861    }
3862
3863    #[test]
3864    fn test_page_stats_truncation() {
3865        let string_field = Field::new("a", DataType::Utf8, false);
3866        let binary_field = Field::new("b", DataType::Binary, false);
3867        let schema = Schema::new(vec![string_field, binary_field]);
3868
3869        let raw_string_values = vec!["Blart Versenwald III"];
3870        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
3871        let raw_binary_value_refs = raw_binary_values
3872            .iter()
3873            .map(|x| x.as_slice())
3874            .collect::<Vec<_>>();
3875
3876        let string_values = StringArray::from(raw_string_values.clone());
3877        let binary_values = BinaryArray::from(raw_binary_value_refs);
3878        let batch = RecordBatch::try_new(
3879            Arc::new(schema),
3880            vec![Arc::new(string_values), Arc::new(binary_values)],
3881        )
3882        .unwrap();
3883
3884        let props = WriterProperties::builder()
3885            .set_statistics_truncate_length(Some(2))
3886            .set_dictionary_enabled(false)
3887            .set_encoding(Encoding::PLAIN)
3888            .set_write_page_header_statistics(true)
3889            .set_compression(crate::basic::Compression::UNCOMPRESSED)
3890            .build();
3891
3892        let mut file = roundtrip_opts(&batch, props);
3893
3894        // read file and decode page headers
3895        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
3896        let mut buf = vec![];
3897        file.seek(std::io::SeekFrom::Start(0)).unwrap();
3898        let read = file.read_to_end(&mut buf).unwrap();
3899        assert!(read > 0);
3900
3901        // decode first page header
3902        let first_page = &buf[4..];
3903        let mut prot = TCompactSliceInputProtocol::new(first_page);
3904        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3905        let stats = hdr.data_page_header.unwrap().statistics;
3906        assert!(stats.is_some());
3907        let stats = stats.unwrap();
3908        // check that min/max were properly truncated
3909        assert!(!stats.is_max_value_exact.unwrap());
3910        assert!(!stats.is_min_value_exact.unwrap());
3911        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3912        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3913
3914        // check second page now
3915        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
3916        let mut prot = TCompactSliceInputProtocol::new(second_page);
3917        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3918        let stats = hdr.data_page_header.unwrap().statistics;
3919        assert!(stats.is_some());
3920        let stats = stats.unwrap();
3921        // check that min/max were properly truncated
3922        assert!(!stats.is_max_value_exact.unwrap());
3923        assert!(!stats.is_min_value_exact.unwrap());
3924        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3925        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3926    }
3927
3928    #[test]
3929    fn test_page_encoding_statistics_roundtrip() {
3930        let batch_schema = Schema::new(vec![Field::new(
3931            "int32",
3932            arrow_schema::DataType::Int32,
3933            false,
3934        )]);
3935
3936        let batch = RecordBatch::try_new(
3937            Arc::new(batch_schema.clone()),
3938            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3939        )
3940        .unwrap();
3941
3942        let mut file: File = tempfile::tempfile().unwrap();
3943        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
3944        writer.write(&batch).unwrap();
3945        let file_metadata = writer.close().unwrap();
3946
3947        assert_eq!(file_metadata.row_groups.len(), 1);
3948        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
3949        let chunk_meta = file_metadata.row_groups[0].columns[0]
3950            .meta_data
3951            .as_ref()
3952            .expect("column metadata missing");
3953        assert!(chunk_meta.encoding_stats.is_some());
3954        let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
3955
3956        // check that the read metadata is also correct
3957        let options = ReadOptionsBuilder::new().with_page_index().build();
3958        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
3959
3960        let rowgroup = reader.get_row_group(0).expect("row group missing");
3961        assert_eq!(rowgroup.num_columns(), 1);
3962        let column = rowgroup.metadata().column(0);
3963        assert!(column.page_encoding_stats().is_some());
3964        let file_page_stats = column.page_encoding_stats().unwrap();
3965        let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
3966            .iter()
3967            .map(|x| crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
3968            .collect();
3969        assert_eq!(&chunk_stats, file_page_stats);
3970    }
3971
3972    #[test]
3973    fn test_different_dict_page_size_limit() {
3974        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
3975        let schema = Arc::new(Schema::new(vec![
3976            Field::new("col0", arrow_schema::DataType::Int64, false),
3977            Field::new("col1", arrow_schema::DataType::Int64, false),
3978        ]));
3979        let batch =
3980            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
3981
3982        let props = WriterProperties::builder()
3983            .set_dictionary_page_size_limit(1024 * 1024)
3984            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
3985            .build();
3986        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
3987        writer.write(&batch).unwrap();
3988        let data = Bytes::from(writer.into_inner().unwrap());
3989
3990        let mut metadata = ParquetMetaDataReader::new();
3991        metadata.try_parse(&data).unwrap();
3992        let metadata = metadata.finish().unwrap();
3993        let col0_meta = metadata.row_group(0).column(0);
3994        let col1_meta = metadata.row_group(0).column(1);
3995
3996        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
3997            let mut reader =
3998                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
3999            let page = reader.get_next_page().unwrap().unwrap();
4000            match page {
4001                Page::DictionaryPage { buf, .. } => buf.len(),
4002                _ => panic!("expected DictionaryPage"),
4003            }
4004        };
4005
4006        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4007        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4008    }
4009}