parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58/// Encodes [`RecordBatch`] to parquet
59///
60/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
61/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
62/// flushed on close, leading the final row group in the output file to potentially
63/// contain fewer than `max_row_group_size` rows
64///
65/// # Example: Writing `RecordBatch`es
66/// ```
67/// # use std::sync::Arc;
68/// # use bytes::Bytes;
69/// # use arrow_array::{ArrayRef, Int64Array};
70/// # use arrow_array::RecordBatch;
71/// # use parquet::arrow::arrow_writer::ArrowWriter;
72/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
73/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
74/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
75///
76/// let mut buffer = Vec::new();
77/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
78/// writer.write(&to_write).unwrap();
79/// writer.close().unwrap();
80///
81/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
82/// let read = reader.next().unwrap().unwrap();
83///
84/// assert_eq!(to_write, read);
85/// ```
86///
87/// # Memory Usage and Limiting
88///
89/// The nature of Parquet requires buffering of an entire row group before it can
90/// be flushed to the underlying writer. Data is mostly buffered in its encoded
91/// form, reducing memory usage. However, some data such as dictionary keys,
92/// large strings or very nested data may still result in non-trivial memory
93/// usage.
94///
95/// See Also:
96/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
97/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
98///
99/// Call [`Self::flush`] to trigger an early flush of a row group based on a
100/// memory threshold and/or global memory pressure. However,  smaller row groups
101/// result in higher metadata overheads, and thus may worsen compression ratios
102/// and query performance.
103///
104/// ```no_run
105/// # use std::io::Write;
106/// # use arrow_array::RecordBatch;
107/// # use parquet::arrow::ArrowWriter;
108/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
109/// # let batch: RecordBatch = todo!();
110/// writer.write(&batch).unwrap();
111/// // Trigger an early flush if anticipated size exceeds 1_000_000
112/// if writer.in_progress_size() > 1_000_000 {
113///     writer.flush().unwrap();
114/// }
115/// ```
116///
117/// ## Type Support
118///
119/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
120/// Parquet types including  [`StructArray`] and [`ListArray`].
121///
122/// The following are not supported:
123///
124/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
125///
126/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
127/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
128/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
129/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
130/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
131///
132/// ## Type Compatibility
133/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
134/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
135/// value type.
136///
137/// For example, the following [`DataType`]s are all logically equivalent and can be written
138/// to the same column:
139/// * String, LargeString, StringView
140/// * Binary, LargeBinary, BinaryView
141///
142/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
143/// contain compatible values.
144/// ```
145/// # use std::sync::Arc;
146/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
147/// # use arrow_schema::{DataType, Field, Schema};
148/// # use parquet::arrow::arrow_writer::ArrowWriter;
149/// let record_batch1 = RecordBatch::try_new(
150///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
151///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
152///  )
153/// .unwrap();
154///
155/// let mut buffer = Vec::new();
156/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
157/// writer.write(&record_batch1).unwrap();
158///
159/// let record_batch2 = RecordBatch::try_new(
160///     Arc::new(Schema::new(vec![Field::new(
161///         "col",
162///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
163///          false,
164///     )])),
165///     vec![Arc::new(DictionaryArray::new(
166///          UInt8Array::from_iter_values(vec![0, 1]),
167///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
168///      ))],
169///  )
170///  .unwrap();
171///  writer.write(&record_batch2).unwrap();
172///  writer.close();
173/// ```
174pub struct ArrowWriter<W: Write> {
175    /// Underlying Parquet writer
176    writer: SerializedFileWriter<W>,
177
178    /// The in-progress row group if any
179    in_progress: Option<ArrowRowGroupWriter>,
180
181    /// A copy of the Arrow schema.
182    ///
183    /// The schema is used to verify that each record batch written has the correct schema
184    arrow_schema: SchemaRef,
185
186    /// Creates new [`ArrowRowGroupWriter`] instances as required
187    row_group_writer_factory: ArrowRowGroupWriterFactory,
188
189    /// The length of arrays to write to each row group
190    max_row_group_size: usize,
191}
192
193impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        let buffered_memory = self.in_progress_size();
196        f.debug_struct("ArrowWriter")
197            .field("writer", &self.writer)
198            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
199            .field("in_progress_rows", &self.in_progress_rows())
200            .field("arrow_schema", &self.arrow_schema)
201            .field("max_row_group_size", &self.max_row_group_size)
202            .finish()
203    }
204}
205
206impl<W: Write + Send> ArrowWriter<W> {
207    /// Try to create a new Arrow writer
208    ///
209    /// The writer will fail if:
210    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
211    ///  * the Arrow schema contains unsupported datatypes such as Unions
212    pub fn try_new(
213        writer: W,
214        arrow_schema: SchemaRef,
215        props: Option<WriterProperties>,
216    ) -> Result<Self> {
217        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
218        Self::try_new_with_options(writer, arrow_schema, options)
219    }
220
221    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
222    ///
223    /// The writer will fail if:
224    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
225    ///  * the Arrow schema contains unsupported datatypes such as Unions
226    pub fn try_new_with_options(
227        writer: W,
228        arrow_schema: SchemaRef,
229        options: ArrowWriterOptions,
230    ) -> Result<Self> {
231        let mut props = options.properties;
232        let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
233        if let Some(schema_root) = &options.schema_root {
234            converter = converter.schema_root(schema_root);
235        }
236        let schema = converter.convert(&arrow_schema)?;
237        if !options.skip_arrow_metadata {
238            // add serialized arrow schema
239            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
240        }
241
242        let max_row_group_size = props.max_row_group_size();
243
244        let props_ptr = Arc::new(props);
245        let file_writer =
246            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
247
248        let row_group_writer_factory =
249            ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
250
251        Ok(Self {
252            writer: file_writer,
253            in_progress: None,
254            arrow_schema,
255            row_group_writer_factory,
256            max_row_group_size,
257        })
258    }
259
260    /// Returns metadata for any flushed row groups
261    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
262        self.writer.flushed_row_groups()
263    }
264
265    /// Estimated memory usage, in bytes, of this `ArrowWriter`
266    ///
267    /// This estimate is formed bu summing the values of
268    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
269    pub fn memory_size(&self) -> usize {
270        match &self.in_progress {
271            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
272            None => 0,
273        }
274    }
275
276    /// Anticipated encoded size of the in progress row group.
277    ///
278    /// This estimate the row group size after being completely encoded is,
279    /// formed by summing the values of
280    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
281    /// columns.
282    pub fn in_progress_size(&self) -> usize {
283        match &self.in_progress {
284            Some(in_progress) => in_progress
285                .writers
286                .iter()
287                .map(|x| x.get_estimated_total_bytes())
288                .sum(),
289            None => 0,
290        }
291    }
292
293    /// Returns the number of rows buffered in the in progress row group
294    pub fn in_progress_rows(&self) -> usize {
295        self.in_progress
296            .as_ref()
297            .map(|x| x.buffered_rows)
298            .unwrap_or_default()
299    }
300
301    /// Returns the number of bytes written by this instance
302    pub fn bytes_written(&self) -> usize {
303        self.writer.bytes_written()
304    }
305
306    /// Encodes the provided [`RecordBatch`]
307    ///
308    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
309    /// rows, the contents of `batch` will be written to one or more row groups such that all but
310    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
311    ///
312    /// This will fail if the `batch`'s schema does not match the writer's schema.
313    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
314        if batch.num_rows() == 0 {
315            return Ok(());
316        }
317
318        let in_progress = match &mut self.in_progress {
319            Some(in_progress) => in_progress,
320            x => x.insert(
321                self.row_group_writer_factory
322                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
323            ),
324        };
325
326        // If would exceed max_row_group_size, split batch
327        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
328            let to_write = self.max_row_group_size - in_progress.buffered_rows;
329            let a = batch.slice(0, to_write);
330            let b = batch.slice(to_write, batch.num_rows() - to_write);
331            self.write(&a)?;
332            return self.write(&b);
333        }
334
335        in_progress.write(batch)?;
336
337        if in_progress.buffered_rows >= self.max_row_group_size {
338            self.flush()?
339        }
340        Ok(())
341    }
342
343    /// Writes the given buf bytes to the internal buffer.
344    ///
345    /// It's safe to use this method to write data to the underlying writer,
346    /// because it will ensure that the buffering and byte‐counting layers are used.
347    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
348        self.writer.write_all(buf)
349    }
350
351    /// Flushes all buffered rows into a new row group
352    pub fn flush(&mut self) -> Result<()> {
353        let in_progress = match self.in_progress.take() {
354            Some(in_progress) => in_progress,
355            None => return Ok(()),
356        };
357
358        let mut row_group_writer = self.writer.next_row_group()?;
359        for chunk in in_progress.close()? {
360            chunk.append_to_row_group(&mut row_group_writer)?;
361        }
362        row_group_writer.close()?;
363        Ok(())
364    }
365
366    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
367    ///
368    /// This method provide a way to append kv_metadata after write RecordBatch
369    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
370        self.writer.append_key_value_metadata(kv_metadata)
371    }
372
373    /// Returns a reference to the underlying writer.
374    pub fn inner(&self) -> &W {
375        self.writer.inner()
376    }
377
378    /// Returns a mutable reference to the underlying writer.
379    ///
380    /// **Warning**: if you write directly to this writer, you will skip
381    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
382    /// the file footer’s recorded offsets and sizes to diverge from reality,
383    /// resulting in an unreadable or corrupted Parquet file.
384    ///
385    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
386    pub fn inner_mut(&mut self) -> &mut W {
387        self.writer.inner_mut()
388    }
389
390    /// Flushes any outstanding data and returns the underlying writer.
391    pub fn into_inner(mut self) -> Result<W> {
392        self.flush()?;
393        self.writer.into_inner()
394    }
395
396    /// Close and finalize the underlying Parquet writer
397    ///
398    /// Unlike [`Self::close`] this does not consume self
399    ///
400    /// Attempting to write after calling finish will result in an error
401    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
402        self.flush()?;
403        self.writer.finish()
404    }
405
406    /// Close and finalize the underlying Parquet writer
407    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
408        self.finish()
409    }
410
411    /// Create a new row group writer and return its column writers.
412    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
413        self.flush()?;
414        let in_progress = self
415            .row_group_writer_factory
416            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
417        Ok(in_progress.writers)
418    }
419
420    /// Append the given column chunks to the file as a new row group.
421    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
422        let mut row_group_writer = self.writer.next_row_group()?;
423        for chunk in chunks {
424            chunk.append_to_row_group(&mut row_group_writer)?;
425        }
426        row_group_writer.close()?;
427        Ok(())
428    }
429}
430
431impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
432    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
433        self.write(batch).map_err(|e| e.into())
434    }
435
436    fn close(self) -> std::result::Result<(), ArrowError> {
437        self.close()?;
438        Ok(())
439    }
440}
441
442/// Arrow-specific configuration settings for writing parquet files.
443///
444/// See [`ArrowWriter`] for how to configure the writer.
445#[derive(Debug, Clone, Default)]
446pub struct ArrowWriterOptions {
447    properties: WriterProperties,
448    skip_arrow_metadata: bool,
449    schema_root: Option<String>,
450}
451
452impl ArrowWriterOptions {
453    /// Creates a new [`ArrowWriterOptions`] with the default settings.
454    pub fn new() -> Self {
455        Self::default()
456    }
457
458    /// Sets the [`WriterProperties`] for writing parquet files.
459    pub fn with_properties(self, properties: WriterProperties) -> Self {
460        Self { properties, ..self }
461    }
462
463    /// Skip encoding the embedded arrow metadata (defaults to `false`)
464    ///
465    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
466    /// by default.
467    ///
468    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
469    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
470        Self {
471            skip_arrow_metadata,
472            ..self
473        }
474    }
475
476    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
477    pub fn with_schema_root(self, schema_root: String) -> Self {
478        Self {
479            schema_root: Some(schema_root),
480            ..self
481        }
482    }
483}
484
485/// A single column chunk produced by [`ArrowColumnWriter`]
486#[derive(Default)]
487struct ArrowColumnChunkData {
488    length: usize,
489    data: Vec<Bytes>,
490}
491
492impl Length for ArrowColumnChunkData {
493    fn len(&self) -> u64 {
494        self.length as _
495    }
496}
497
498impl ChunkReader for ArrowColumnChunkData {
499    type T = ArrowColumnChunkReader;
500
501    fn get_read(&self, start: u64) -> Result<Self::T> {
502        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
503        Ok(ArrowColumnChunkReader(
504            self.data.clone().into_iter().peekable(),
505        ))
506    }
507
508    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
509        unimplemented!()
510    }
511}
512
513/// A [`Read`] for [`ArrowColumnChunkData`]
514struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
515
516impl Read for ArrowColumnChunkReader {
517    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
518        let buffer = loop {
519            match self.0.peek_mut() {
520                Some(b) if b.is_empty() => {
521                    self.0.next();
522                    continue;
523                }
524                Some(b) => break b,
525                None => return Ok(0),
526            }
527        };
528
529        let len = buffer.len().min(out.len());
530        let b = buffer.split_to(len);
531        out[..len].copy_from_slice(&b);
532        Ok(len)
533    }
534}
535
536/// A shared [`ArrowColumnChunkData`]
537///
538/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
539/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
540type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
541
542#[derive(Default)]
543struct ArrowPageWriter {
544    buffer: SharedColumnChunk,
545    #[cfg(feature = "encryption")]
546    page_encryptor: Option<PageEncryptor>,
547}
548
549impl ArrowPageWriter {
550    #[cfg(feature = "encryption")]
551    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
552        self.page_encryptor = page_encryptor;
553        self
554    }
555
556    #[cfg(feature = "encryption")]
557    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
558        self.page_encryptor.as_mut()
559    }
560
561    #[cfg(not(feature = "encryption"))]
562    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
563        None
564    }
565}
566
567impl PageWriter for ArrowPageWriter {
568    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
569        let page = match self.page_encryptor_mut() {
570            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
571            None => page,
572        };
573
574        let page_header = page.to_thrift_header()?;
575        let header = {
576            let mut header = Vec::with_capacity(1024);
577
578            match self.page_encryptor_mut() {
579                Some(page_encryptor) => {
580                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
581                    if page.compressed_page().is_data_page() {
582                        page_encryptor.increment_page();
583                    }
584                }
585                None => {
586                    let mut protocol = TCompactOutputProtocol::new(&mut header);
587                    page_header.write_to_out_protocol(&mut protocol)?;
588                }
589            };
590
591            Bytes::from(header)
592        };
593
594        let mut buf = self.buffer.try_lock().unwrap();
595
596        let data = page.compressed_page().buffer().clone();
597        let compressed_size = data.len() + header.len();
598
599        let mut spec = PageWriteSpec::new();
600        spec.page_type = page.page_type();
601        spec.num_values = page.num_values();
602        spec.uncompressed_size = page.uncompressed_size() + header.len();
603        spec.offset = buf.length as u64;
604        spec.compressed_size = compressed_size;
605        spec.bytes_written = compressed_size as u64;
606
607        buf.length += compressed_size;
608        buf.data.push(header);
609        buf.data.push(data);
610
611        Ok(spec)
612    }
613
614    fn close(&mut self) -> Result<()> {
615        Ok(())
616    }
617}
618
619/// A leaf column that can be encoded by [`ArrowColumnWriter`]
620#[derive(Debug)]
621pub struct ArrowLeafColumn(ArrayLevels);
622
623/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
624pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
625    let levels = calculate_array_levels(array, field)?;
626    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
627}
628
629/// The data for a single column chunk, see [`ArrowColumnWriter`]
630pub struct ArrowColumnChunk {
631    data: ArrowColumnChunkData,
632    close: ColumnCloseResult,
633}
634
635impl std::fmt::Debug for ArrowColumnChunk {
636    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637        f.debug_struct("ArrowColumnChunk")
638            .field("length", &self.data.length)
639            .finish_non_exhaustive()
640    }
641}
642
643impl ArrowColumnChunk {
644    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
645    pub fn append_to_row_group<W: Write + Send>(
646        self,
647        writer: &mut SerializedRowGroupWriter<'_, W>,
648    ) -> Result<()> {
649        writer.append_column(&self.data, self.close)
650    }
651}
652
653/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
654///
655/// Note: This is a low-level interface for applications that require
656/// fine-grained control of encoding (e.g. encoding using multiple threads),
657/// see [`ArrowWriter`] for a higher-level interface
658///
659/// # Example: Encoding two Arrow Array's in Parallel
660/// ```
661/// // The arrow schema
662/// # use std::sync::Arc;
663/// # use arrow_array::*;
664/// # use arrow_schema::*;
665/// # use parquet::arrow::ArrowSchemaConverter;
666/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
667/// # use parquet::file::properties::WriterProperties;
668/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
669/// #
670/// let schema = Arc::new(Schema::new(vec![
671///     Field::new("i32", DataType::Int32, false),
672///     Field::new("f32", DataType::Float32, false),
673/// ]));
674///
675/// // Compute the parquet schema
676/// let props = Arc::new(WriterProperties::default());
677/// let parquet_schema = ArrowSchemaConverter::new()
678///   .with_coerce_types(props.coerce_types())
679///   .convert(&schema)
680///   .unwrap();
681///
682/// // Create writers for each of the leaf columns
683/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
684///
685/// // Spawn a worker thread for each column
686/// //
687/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
688/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
689/// let mut workers: Vec<_> = col_writers
690///     .into_iter()
691///     .map(|mut col_writer| {
692///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
693///         let handle = std::thread::spawn(move || {
694///             // receive Arrays to encode via the channel
695///             for col in recv {
696///                 col_writer.write(&col)?;
697///             }
698///             // once the input is complete, close the writer
699///             // to return the newly created ArrowColumnChunk
700///             col_writer.close()
701///         });
702///         (handle, send)
703///     })
704///     .collect();
705///
706/// // Create parquet writer
707/// let root_schema = parquet_schema.root_schema_ptr();
708/// // write to memory in the example, but this could be a File
709/// let mut out = Vec::with_capacity(1024);
710/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
711///   .unwrap();
712///
713/// // Start row group
714/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
715///   .next_row_group()
716///   .unwrap();
717///
718/// // Create some example input columns to encode
719/// let to_write = vec![
720///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
721///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
722/// ];
723///
724/// // Send the input columns to the workers
725/// let mut worker_iter = workers.iter_mut();
726/// for (arr, field) in to_write.iter().zip(&schema.fields) {
727///     for leaves in compute_leaves(field, arr).unwrap() {
728///         worker_iter.next().unwrap().1.send(leaves).unwrap();
729///     }
730/// }
731///
732/// // Wait for the workers to complete encoding, and append
733/// // the resulting column chunks to the row group (and the file)
734/// for (handle, send) in workers {
735///     drop(send); // Drop send side to signal termination
736///     // wait for the worker to send the completed chunk
737///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
738///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
739/// }
740/// // Close the row group which writes to the underlying file
741/// row_group_writer.close().unwrap();
742///
743/// let metadata = writer.close().unwrap();
744/// assert_eq!(metadata.num_rows, 3);
745/// ```
746pub struct ArrowColumnWriter {
747    writer: ArrowColumnWriterImpl,
748    chunk: SharedColumnChunk,
749}
750
751impl std::fmt::Debug for ArrowColumnWriter {
752    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
754    }
755}
756
757enum ArrowColumnWriterImpl {
758    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
759    Column(ColumnWriter<'static>),
760}
761
762impl ArrowColumnWriter {
763    /// Write an [`ArrowLeafColumn`]
764    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
765        match &mut self.writer {
766            ArrowColumnWriterImpl::Column(c) => {
767                write_leaf(c, &col.0)?;
768            }
769            ArrowColumnWriterImpl::ByteArray(c) => {
770                write_primitive(c, col.0.array().as_ref(), &col.0)?;
771            }
772        }
773        Ok(())
774    }
775
776    /// Close this column returning the written [`ArrowColumnChunk`]
777    pub fn close(self) -> Result<ArrowColumnChunk> {
778        let close = match self.writer {
779            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
780            ArrowColumnWriterImpl::Column(c) => c.close()?,
781        };
782        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
783        let data = chunk.into_inner().unwrap();
784        Ok(ArrowColumnChunk { data, close })
785    }
786
787    /// Returns the estimated total memory usage by the writer.
788    ///
789    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
790    /// of the current memory usage and not it's anticipated encoded size.
791    ///
792    /// This includes:
793    /// 1. Data buffered in encoded form
794    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
795    ///
796    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
797    pub fn memory_size(&self) -> usize {
798        match &self.writer {
799            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
800            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
801        }
802    }
803
804    /// Returns the estimated total encoded bytes for this column writer.
805    ///
806    /// This includes:
807    /// 1. Data buffered in encoded form
808    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
809    ///
810    /// This value should be less than or equal to [`Self::memory_size`]
811    pub fn get_estimated_total_bytes(&self) -> usize {
812        match &self.writer {
813            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
814            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
815        }
816    }
817}
818
819/// Encodes [`RecordBatch`] to a parquet row group
820struct ArrowRowGroupWriter {
821    writers: Vec<ArrowColumnWriter>,
822    schema: SchemaRef,
823    buffered_rows: usize,
824}
825
826impl ArrowRowGroupWriter {
827    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
828        Self {
829            writers,
830            schema: arrow.clone(),
831            buffered_rows: 0,
832        }
833    }
834
835    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
836        self.buffered_rows += batch.num_rows();
837        let mut writers = self.writers.iter_mut();
838        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
839            for leaf in compute_leaves(field.as_ref(), column)? {
840                writers.next().unwrap().write(&leaf)?
841            }
842        }
843        Ok(())
844    }
845
846    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
847        self.writers
848            .into_iter()
849            .map(|writer| writer.close())
850            .collect()
851    }
852}
853
854struct ArrowRowGroupWriterFactory {
855    schema: SchemaDescriptor,
856    arrow_schema: SchemaRef,
857    props: WriterPropertiesPtr,
858    #[cfg(feature = "encryption")]
859    file_encryptor: Option<Arc<FileEncryptor>>,
860}
861
862impl ArrowRowGroupWriterFactory {
863    #[cfg(feature = "encryption")]
864    fn new<W: Write + Send>(
865        file_writer: &SerializedFileWriter<W>,
866        schema: SchemaDescriptor,
867        arrow_schema: SchemaRef,
868        props: WriterPropertiesPtr,
869    ) -> Self {
870        Self {
871            schema,
872            arrow_schema,
873            props,
874            file_encryptor: file_writer.file_encryptor(),
875        }
876    }
877
878    #[cfg(not(feature = "encryption"))]
879    fn new<W: Write + Send>(
880        _file_writer: &SerializedFileWriter<W>,
881        schema: SchemaDescriptor,
882        arrow_schema: SchemaRef,
883        props: WriterPropertiesPtr,
884    ) -> Self {
885        Self {
886            schema,
887            arrow_schema,
888            props,
889        }
890    }
891
892    #[cfg(feature = "encryption")]
893    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
894        let writers = get_column_writers_with_encryptor(
895            &self.schema,
896            &self.props,
897            &self.arrow_schema,
898            self.file_encryptor.clone(),
899            row_group_index,
900        )?;
901        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
902    }
903
904    #[cfg(not(feature = "encryption"))]
905    fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
906        let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
907        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
908    }
909}
910
911/// Returns the [`ArrowColumnWriter`] for a given schema
912pub fn get_column_writers(
913    parquet: &SchemaDescriptor,
914    props: &WriterPropertiesPtr,
915    arrow: &SchemaRef,
916) -> Result<Vec<ArrowColumnWriter>> {
917    let mut writers = Vec::with_capacity(arrow.fields.len());
918    let mut leaves = parquet.columns().iter();
919    let column_factory = ArrowColumnWriterFactory::new();
920    for field in &arrow.fields {
921        column_factory.get_arrow_column_writer(
922            field.data_type(),
923            props,
924            &mut leaves,
925            &mut writers,
926        )?;
927    }
928    Ok(writers)
929}
930
931/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
932#[cfg(feature = "encryption")]
933fn get_column_writers_with_encryptor(
934    parquet: &SchemaDescriptor,
935    props: &WriterPropertiesPtr,
936    arrow: &SchemaRef,
937    file_encryptor: Option<Arc<FileEncryptor>>,
938    row_group_index: usize,
939) -> Result<Vec<ArrowColumnWriter>> {
940    let mut writers = Vec::with_capacity(arrow.fields.len());
941    let mut leaves = parquet.columns().iter();
942    let column_factory =
943        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
944    for field in &arrow.fields {
945        column_factory.get_arrow_column_writer(
946            field.data_type(),
947            props,
948            &mut leaves,
949            &mut writers,
950        )?;
951    }
952    Ok(writers)
953}
954
955/// Gets [`ArrowColumnWriter`] instances for different data types
956struct ArrowColumnWriterFactory {
957    #[cfg(feature = "encryption")]
958    row_group_index: usize,
959    #[cfg(feature = "encryption")]
960    file_encryptor: Option<Arc<FileEncryptor>>,
961}
962
963impl ArrowColumnWriterFactory {
964    pub fn new() -> Self {
965        Self {
966            #[cfg(feature = "encryption")]
967            row_group_index: 0,
968            #[cfg(feature = "encryption")]
969            file_encryptor: None,
970        }
971    }
972
973    #[cfg(feature = "encryption")]
974    pub fn with_file_encryptor(
975        mut self,
976        row_group_index: usize,
977        file_encryptor: Option<Arc<FileEncryptor>>,
978    ) -> Self {
979        self.row_group_index = row_group_index;
980        self.file_encryptor = file_encryptor;
981        self
982    }
983
984    #[cfg(feature = "encryption")]
985    fn create_page_writer(
986        &self,
987        column_descriptor: &ColumnDescPtr,
988        column_index: usize,
989    ) -> Result<Box<ArrowPageWriter>> {
990        let column_path = column_descriptor.path().string();
991        let page_encryptor = PageEncryptor::create_if_column_encrypted(
992            &self.file_encryptor,
993            self.row_group_index,
994            column_index,
995            &column_path,
996        )?;
997        Ok(Box::new(
998            ArrowPageWriter::default().with_encryptor(page_encryptor),
999        ))
1000    }
1001
1002    #[cfg(not(feature = "encryption"))]
1003    fn create_page_writer(
1004        &self,
1005        _column_descriptor: &ColumnDescPtr,
1006        _column_index: usize,
1007    ) -> Result<Box<ArrowPageWriter>> {
1008        Ok(Box::<ArrowPageWriter>::default())
1009    }
1010
1011    /// Gets the [`ArrowColumnWriter`] for the given `data_type`
1012    fn get_arrow_column_writer(
1013        &self,
1014        data_type: &ArrowDataType,
1015        props: &WriterPropertiesPtr,
1016        leaves: &mut Iter<'_, ColumnDescPtr>,
1017        out: &mut Vec<ArrowColumnWriter>,
1018    ) -> Result<()> {
1019        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1020            let page_writer = self.create_page_writer(desc, out.len())?;
1021            let chunk = page_writer.buffer.clone();
1022            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1023            Ok(ArrowColumnWriter {
1024                chunk,
1025                writer: ArrowColumnWriterImpl::Column(writer),
1026            })
1027        };
1028
1029        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1030            let page_writer = self.create_page_writer(desc, out.len())?;
1031            let chunk = page_writer.buffer.clone();
1032            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1033            Ok(ArrowColumnWriter {
1034                chunk,
1035                writer: ArrowColumnWriterImpl::ByteArray(writer),
1036            })
1037        };
1038
1039        match data_type {
1040            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1041            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
1042            ArrowDataType::LargeBinary
1043            | ArrowDataType::Binary
1044            | ArrowDataType::Utf8
1045            | ArrowDataType::LargeUtf8
1046            | ArrowDataType::BinaryView
1047            | ArrowDataType::Utf8View => {
1048                out.push(bytes(leaves.next().unwrap())?)
1049            }
1050            ArrowDataType::List(f)
1051            | ArrowDataType::LargeList(f)
1052            | ArrowDataType::FixedSizeList(f, _) => {
1053                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1054            }
1055            ArrowDataType::Struct(fields) => {
1056                for field in fields {
1057                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1058                }
1059            }
1060            ArrowDataType::Map(f, _) => match f.data_type() {
1061                ArrowDataType::Struct(f) => {
1062                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1063                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1064                }
1065                _ => unreachable!("invalid map type"),
1066            }
1067            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1068                ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
1069                    out.push(bytes(leaves.next().unwrap())?)
1070                }
1071                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1072                    out.push(bytes(leaves.next().unwrap())?)
1073                }
1074                ArrowDataType::FixedSizeBinary(_) => {
1075                    out.push(bytes(leaves.next().unwrap())?)
1076                }
1077                _ => {
1078                    out.push(col(leaves.next().unwrap())?)
1079                }
1080            }
1081            _ => return Err(ParquetError::NYI(
1082                format!(
1083                    "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1084                )
1085            ))
1086        }
1087        Ok(())
1088    }
1089}
1090
1091fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1092    let column = levels.array().as_ref();
1093    let indices = levels.non_null_indices();
1094    match writer {
1095        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1096            match column.data_type() {
1097                ArrowDataType::Date64 => {
1098                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1099                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1100                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1101
1102                    let array = array.as_primitive::<Int32Type>();
1103                    write_primitive(typed, array.values(), levels)
1104                }
1105                ArrowDataType::UInt32 => {
1106                    let values = column.as_primitive::<UInt32Type>().values();
1107                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1108                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1109                    let array = values.inner().typed_data::<i32>();
1110                    write_primitive(typed, array, levels)
1111                }
1112                ArrowDataType::Decimal32(_, _) => {
1113                    let array = column
1114                        .as_primitive::<Decimal32Type>()
1115                        .unary::<_, Int32Type>(|v| v);
1116                    write_primitive(typed, array.values(), levels)
1117                }
1118                ArrowDataType::Decimal64(_, _) => {
1119                    // use the int32 to represent the decimal with low precision
1120                    let array = column
1121                        .as_primitive::<Decimal64Type>()
1122                        .unary::<_, Int32Type>(|v| v as i32);
1123                    write_primitive(typed, array.values(), levels)
1124                }
1125                ArrowDataType::Decimal128(_, _) => {
1126                    // use the int32 to represent the decimal with low precision
1127                    let array = column
1128                        .as_primitive::<Decimal128Type>()
1129                        .unary::<_, Int32Type>(|v| v as i32);
1130                    write_primitive(typed, array.values(), levels)
1131                }
1132                ArrowDataType::Decimal256(_, _) => {
1133                    // use the int32 to represent the decimal with low precision
1134                    let array = column
1135                        .as_primitive::<Decimal256Type>()
1136                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1137                    write_primitive(typed, array.values(), levels)
1138                }
1139                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1140                    ArrowDataType::Decimal32(_, _) => {
1141                        let array = arrow_cast::cast(column, value_type)?;
1142                        let array = array
1143                            .as_primitive::<Decimal32Type>()
1144                            .unary::<_, Int32Type>(|v| v);
1145                        write_primitive(typed, array.values(), levels)
1146                    }
1147                    ArrowDataType::Decimal64(_, _) => {
1148                        let array = arrow_cast::cast(column, value_type)?;
1149                        let array = array
1150                            .as_primitive::<Decimal64Type>()
1151                            .unary::<_, Int32Type>(|v| v as i32);
1152                        write_primitive(typed, array.values(), levels)
1153                    }
1154                    ArrowDataType::Decimal128(_, _) => {
1155                        let array = arrow_cast::cast(column, value_type)?;
1156                        let array = array
1157                            .as_primitive::<Decimal128Type>()
1158                            .unary::<_, Int32Type>(|v| v as i32);
1159                        write_primitive(typed, array.values(), levels)
1160                    }
1161                    ArrowDataType::Decimal256(_, _) => {
1162                        let array = arrow_cast::cast(column, value_type)?;
1163                        let array = array
1164                            .as_primitive::<Decimal256Type>()
1165                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1166                        write_primitive(typed, array.values(), levels)
1167                    }
1168                    _ => {
1169                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1170                        let array = array.as_primitive::<Int32Type>();
1171                        write_primitive(typed, array.values(), levels)
1172                    }
1173                },
1174                _ => {
1175                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1176                    let array = array.as_primitive::<Int32Type>();
1177                    write_primitive(typed, array.values(), levels)
1178                }
1179            }
1180        }
1181        ColumnWriter::BoolColumnWriter(ref mut typed) => {
1182            let array = column.as_boolean();
1183            typed.write_batch(
1184                get_bool_array_slice(array, indices).as_slice(),
1185                levels.def_levels(),
1186                levels.rep_levels(),
1187            )
1188        }
1189        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1190            match column.data_type() {
1191                ArrowDataType::Date64 => {
1192                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1193
1194                    let array = array.as_primitive::<Int64Type>();
1195                    write_primitive(typed, array.values(), levels)
1196                }
1197                ArrowDataType::Int64 => {
1198                    let array = column.as_primitive::<Int64Type>();
1199                    write_primitive(typed, array.values(), levels)
1200                }
1201                ArrowDataType::UInt64 => {
1202                    let values = column.as_primitive::<UInt64Type>().values();
1203                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1204                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1205                    let array = values.inner().typed_data::<i64>();
1206                    write_primitive(typed, array, levels)
1207                }
1208                ArrowDataType::Decimal64(_, _) => {
1209                    let array = column
1210                        .as_primitive::<Decimal64Type>()
1211                        .unary::<_, Int64Type>(|v| v);
1212                    write_primitive(typed, array.values(), levels)
1213                }
1214                ArrowDataType::Decimal128(_, _) => {
1215                    // use the int64 to represent the decimal with low precision
1216                    let array = column
1217                        .as_primitive::<Decimal128Type>()
1218                        .unary::<_, Int64Type>(|v| v as i64);
1219                    write_primitive(typed, array.values(), levels)
1220                }
1221                ArrowDataType::Decimal256(_, _) => {
1222                    // use the int64 to represent the decimal with low precision
1223                    let array = column
1224                        .as_primitive::<Decimal256Type>()
1225                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1226                    write_primitive(typed, array.values(), levels)
1227                }
1228                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1229                    ArrowDataType::Decimal64(_, _) => {
1230                        let array = arrow_cast::cast(column, value_type)?;
1231                        let array = array
1232                            .as_primitive::<Decimal64Type>()
1233                            .unary::<_, Int64Type>(|v| v);
1234                        write_primitive(typed, array.values(), levels)
1235                    }
1236                    ArrowDataType::Decimal128(_, _) => {
1237                        let array = arrow_cast::cast(column, value_type)?;
1238                        let array = array
1239                            .as_primitive::<Decimal128Type>()
1240                            .unary::<_, Int64Type>(|v| v as i64);
1241                        write_primitive(typed, array.values(), levels)
1242                    }
1243                    ArrowDataType::Decimal256(_, _) => {
1244                        let array = arrow_cast::cast(column, value_type)?;
1245                        let array = array
1246                            .as_primitive::<Decimal256Type>()
1247                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1248                        write_primitive(typed, array.values(), levels)
1249                    }
1250                    _ => {
1251                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1252                        let array = array.as_primitive::<Int64Type>();
1253                        write_primitive(typed, array.values(), levels)
1254                    }
1255                },
1256                _ => {
1257                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1258                    let array = array.as_primitive::<Int64Type>();
1259                    write_primitive(typed, array.values(), levels)
1260                }
1261            }
1262        }
1263        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1264            unreachable!("Currently unreachable because data type not supported")
1265        }
1266        ColumnWriter::FloatColumnWriter(ref mut typed) => {
1267            let array = column.as_primitive::<Float32Type>();
1268            write_primitive(typed, array.values(), levels)
1269        }
1270        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1271            let array = column.as_primitive::<Float64Type>();
1272            write_primitive(typed, array.values(), levels)
1273        }
1274        ColumnWriter::ByteArrayColumnWriter(_) => {
1275            unreachable!("should use ByteArrayWriter")
1276        }
1277        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1278            let bytes = match column.data_type() {
1279                ArrowDataType::Interval(interval_unit) => match interval_unit {
1280                    IntervalUnit::YearMonth => {
1281                        let array = column
1282                            .as_any()
1283                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1284                            .unwrap();
1285                        get_interval_ym_array_slice(array, indices)
1286                    }
1287                    IntervalUnit::DayTime => {
1288                        let array = column
1289                            .as_any()
1290                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1291                            .unwrap();
1292                        get_interval_dt_array_slice(array, indices)
1293                    }
1294                    _ => {
1295                        return Err(ParquetError::NYI(
1296                            format!(
1297                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1298                            )
1299                        ));
1300                    }
1301                },
1302                ArrowDataType::FixedSizeBinary(_) => {
1303                    let array = column
1304                        .as_any()
1305                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1306                        .unwrap();
1307                    get_fsb_array_slice(array, indices)
1308                }
1309                ArrowDataType::Decimal32(_, _) => {
1310                    let array = column.as_primitive::<Decimal32Type>();
1311                    get_decimal_32_array_slice(array, indices)
1312                }
1313                ArrowDataType::Decimal64(_, _) => {
1314                    let array = column.as_primitive::<Decimal64Type>();
1315                    get_decimal_64_array_slice(array, indices)
1316                }
1317                ArrowDataType::Decimal128(_, _) => {
1318                    let array = column.as_primitive::<Decimal128Type>();
1319                    get_decimal_128_array_slice(array, indices)
1320                }
1321                ArrowDataType::Decimal256(_, _) => {
1322                    let array = column
1323                        .as_any()
1324                        .downcast_ref::<arrow_array::Decimal256Array>()
1325                        .unwrap();
1326                    get_decimal_256_array_slice(array, indices)
1327                }
1328                ArrowDataType::Float16 => {
1329                    let array = column.as_primitive::<Float16Type>();
1330                    get_float_16_array_slice(array, indices)
1331                }
1332                _ => {
1333                    return Err(ParquetError::NYI(
1334                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1335                    ));
1336                }
1337            };
1338            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1339        }
1340    }
1341}
1342
1343fn write_primitive<E: ColumnValueEncoder>(
1344    writer: &mut GenericColumnWriter<E>,
1345    values: &E::Values,
1346    levels: &ArrayLevels,
1347) -> Result<usize> {
1348    writer.write_batch_internal(
1349        values,
1350        Some(levels.non_null_indices()),
1351        levels.def_levels(),
1352        levels.rep_levels(),
1353        None,
1354        None,
1355        None,
1356    )
1357}
1358
1359fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1360    let mut values = Vec::with_capacity(indices.len());
1361    for i in indices {
1362        values.push(array.value(*i))
1363    }
1364    values
1365}
1366
1367/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1368/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1369fn get_interval_ym_array_slice(
1370    array: &arrow_array::IntervalYearMonthArray,
1371    indices: &[usize],
1372) -> Vec<FixedLenByteArray> {
1373    let mut values = Vec::with_capacity(indices.len());
1374    for i in indices {
1375        let mut value = array.value(*i).to_le_bytes().to_vec();
1376        let mut suffix = vec![0; 8];
1377        value.append(&mut suffix);
1378        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1379    }
1380    values
1381}
1382
1383/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1384/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1385fn get_interval_dt_array_slice(
1386    array: &arrow_array::IntervalDayTimeArray,
1387    indices: &[usize],
1388) -> Vec<FixedLenByteArray> {
1389    let mut values = Vec::with_capacity(indices.len());
1390    for i in indices {
1391        let mut out = [0; 12];
1392        let value = array.value(*i);
1393        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1394        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1395        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1396    }
1397    values
1398}
1399
1400fn get_decimal_32_array_slice(
1401    array: &arrow_array::Decimal32Array,
1402    indices: &[usize],
1403) -> Vec<FixedLenByteArray> {
1404    let mut values = Vec::with_capacity(indices.len());
1405    let size = decimal_length_from_precision(array.precision());
1406    for i in indices {
1407        let as_be_bytes = array.value(*i).to_be_bytes();
1408        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1409        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1410    }
1411    values
1412}
1413
1414fn get_decimal_64_array_slice(
1415    array: &arrow_array::Decimal64Array,
1416    indices: &[usize],
1417) -> Vec<FixedLenByteArray> {
1418    let mut values = Vec::with_capacity(indices.len());
1419    let size = decimal_length_from_precision(array.precision());
1420    for i in indices {
1421        let as_be_bytes = array.value(*i).to_be_bytes();
1422        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1423        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1424    }
1425    values
1426}
1427
1428fn get_decimal_128_array_slice(
1429    array: &arrow_array::Decimal128Array,
1430    indices: &[usize],
1431) -> Vec<FixedLenByteArray> {
1432    let mut values = Vec::with_capacity(indices.len());
1433    let size = decimal_length_from_precision(array.precision());
1434    for i in indices {
1435        let as_be_bytes = array.value(*i).to_be_bytes();
1436        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1437        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1438    }
1439    values
1440}
1441
1442fn get_decimal_256_array_slice(
1443    array: &arrow_array::Decimal256Array,
1444    indices: &[usize],
1445) -> Vec<FixedLenByteArray> {
1446    let mut values = Vec::with_capacity(indices.len());
1447    let size = decimal_length_from_precision(array.precision());
1448    for i in indices {
1449        let as_be_bytes = array.value(*i).to_be_bytes();
1450        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1451        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1452    }
1453    values
1454}
1455
1456fn get_float_16_array_slice(
1457    array: &arrow_array::Float16Array,
1458    indices: &[usize],
1459) -> Vec<FixedLenByteArray> {
1460    let mut values = Vec::with_capacity(indices.len());
1461    for i in indices {
1462        let value = array.value(*i).to_le_bytes().to_vec();
1463        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1464    }
1465    values
1466}
1467
1468fn get_fsb_array_slice(
1469    array: &arrow_array::FixedSizeBinaryArray,
1470    indices: &[usize],
1471) -> Vec<FixedLenByteArray> {
1472    let mut values = Vec::with_capacity(indices.len());
1473    for i in indices {
1474        let value = array.value(*i).to_vec();
1475        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1476    }
1477    values
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482    use super::*;
1483
1484    use std::fs::File;
1485    use std::io::Seek;
1486
1487    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1488    use crate::arrow::ARROW_SCHEMA_META_KEY;
1489    use crate::column::page::{Page, PageReader};
1490    use crate::file::page_encoding_stats::PageEncodingStats;
1491    use crate::file::reader::SerializedPageReader;
1492    use crate::format::PageHeader;
1493    use crate::schema::types::ColumnPath;
1494    use crate::thrift::TCompactSliceInputProtocol;
1495    use arrow::datatypes::ToByteSlice;
1496    use arrow::datatypes::{DataType, Schema};
1497    use arrow::error::Result as ArrowResult;
1498    use arrow::util::data_gen::create_random_array;
1499    use arrow::util::pretty::pretty_format_batches;
1500    use arrow::{array::*, buffer::Buffer};
1501    use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1502    use arrow_schema::Fields;
1503    use half::f16;
1504    use num::{FromPrimitive, ToPrimitive};
1505    use tempfile::tempfile;
1506
1507    use crate::basic::Encoding;
1508    use crate::data_type::AsBytes;
1509    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1510    use crate::file::page_index::index::Index;
1511    use crate::file::properties::{
1512        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1513    };
1514    use crate::file::serialized_reader::ReadOptionsBuilder;
1515    use crate::file::{
1516        reader::{FileReader, SerializedFileReader},
1517        statistics::Statistics,
1518    };
1519
1520    #[test]
1521    fn arrow_writer() {
1522        // define schema
1523        let schema = Schema::new(vec![
1524            Field::new("a", DataType::Int32, false),
1525            Field::new("b", DataType::Int32, true),
1526        ]);
1527
1528        // create some data
1529        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1530        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1531
1532        // build a record batch
1533        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1534
1535        roundtrip(batch, Some(SMALL_SIZE / 2));
1536    }
1537
1538    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1539        let mut buffer = vec![];
1540
1541        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1542        writer.write(expected_batch).unwrap();
1543        writer.close().unwrap();
1544
1545        buffer
1546    }
1547
1548    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1549        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1550        writer.write(expected_batch).unwrap();
1551        writer.into_inner().unwrap()
1552    }
1553
1554    #[test]
1555    fn roundtrip_bytes() {
1556        // define schema
1557        let schema = Arc::new(Schema::new(vec![
1558            Field::new("a", DataType::Int32, false),
1559            Field::new("b", DataType::Int32, true),
1560        ]));
1561
1562        // create some data
1563        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1564        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1565
1566        // build a record batch
1567        let expected_batch =
1568            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1569
1570        for buffer in [
1571            get_bytes_after_close(schema.clone(), &expected_batch),
1572            get_bytes_by_into_inner(schema, &expected_batch),
1573        ] {
1574            let cursor = Bytes::from(buffer);
1575            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1576
1577            let actual_batch = record_batch_reader
1578                .next()
1579                .expect("No batch found")
1580                .expect("Unable to get batch");
1581
1582            assert_eq!(expected_batch.schema(), actual_batch.schema());
1583            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1584            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1585            for i in 0..expected_batch.num_columns() {
1586                let expected_data = expected_batch.column(i).to_data();
1587                let actual_data = actual_batch.column(i).to_data();
1588
1589                assert_eq!(expected_data, actual_data);
1590            }
1591        }
1592    }
1593
1594    #[test]
1595    fn arrow_writer_non_null() {
1596        // define schema
1597        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1598
1599        // create some data
1600        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1601
1602        // build a record batch
1603        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1604
1605        roundtrip(batch, Some(SMALL_SIZE / 2));
1606    }
1607
1608    #[test]
1609    fn arrow_writer_list() {
1610        // define schema
1611        let schema = Schema::new(vec![Field::new(
1612            "a",
1613            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1614            true,
1615        )]);
1616
1617        // create some data
1618        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1619
1620        // Construct a buffer for value offsets, for the nested array:
1621        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1622        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1623
1624        // Construct a list array from the above two
1625        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1626            DataType::Int32,
1627            false,
1628        ))))
1629        .len(5)
1630        .add_buffer(a_value_offsets)
1631        .add_child_data(a_values.into_data())
1632        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1633        .build()
1634        .unwrap();
1635        let a = ListArray::from(a_list_data);
1636
1637        // build a record batch
1638        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1639
1640        assert_eq!(batch.column(0).null_count(), 1);
1641
1642        // This test fails if the max row group size is less than the batch's length
1643        // see https://github.com/apache/arrow-rs/issues/518
1644        roundtrip(batch, None);
1645    }
1646
1647    #[test]
1648    fn arrow_writer_list_non_null() {
1649        // define schema
1650        let schema = Schema::new(vec![Field::new(
1651            "a",
1652            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1653            false,
1654        )]);
1655
1656        // create some data
1657        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1658
1659        // Construct a buffer for value offsets, for the nested array:
1660        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1661        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1662
1663        // Construct a list array from the above two
1664        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1665            DataType::Int32,
1666            false,
1667        ))))
1668        .len(5)
1669        .add_buffer(a_value_offsets)
1670        .add_child_data(a_values.into_data())
1671        .build()
1672        .unwrap();
1673        let a = ListArray::from(a_list_data);
1674
1675        // build a record batch
1676        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1677
1678        // This test fails if the max row group size is less than the batch's length
1679        // see https://github.com/apache/arrow-rs/issues/518
1680        assert_eq!(batch.column(0).null_count(), 0);
1681
1682        roundtrip(batch, None);
1683    }
1684
1685    #[test]
1686    fn arrow_writer_binary() {
1687        let string_field = Field::new("a", DataType::Utf8, false);
1688        let binary_field = Field::new("b", DataType::Binary, false);
1689        let schema = Schema::new(vec![string_field, binary_field]);
1690
1691        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1692        let raw_binary_values = [
1693            b"foo".to_vec(),
1694            b"bar".to_vec(),
1695            b"baz".to_vec(),
1696            b"quux".to_vec(),
1697        ];
1698        let raw_binary_value_refs = raw_binary_values
1699            .iter()
1700            .map(|x| x.as_slice())
1701            .collect::<Vec<_>>();
1702
1703        let string_values = StringArray::from(raw_string_values.clone());
1704        let binary_values = BinaryArray::from(raw_binary_value_refs);
1705        let batch = RecordBatch::try_new(
1706            Arc::new(schema),
1707            vec![Arc::new(string_values), Arc::new(binary_values)],
1708        )
1709        .unwrap();
1710
1711        roundtrip(batch, Some(SMALL_SIZE / 2));
1712    }
1713
1714    #[test]
1715    fn arrow_writer_binary_view() {
1716        let string_field = Field::new("a", DataType::Utf8View, false);
1717        let binary_field = Field::new("b", DataType::BinaryView, false);
1718        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1719        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1720
1721        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1722        let raw_binary_values = vec![
1723            b"foo".to_vec(),
1724            b"bar".to_vec(),
1725            b"large payload over 12 bytes".to_vec(),
1726            b"lulu".to_vec(),
1727        ];
1728        let nullable_string_values =
1729            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1730
1731        let string_view_values = StringViewArray::from(raw_string_values);
1732        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1733        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1734        let batch = RecordBatch::try_new(
1735            Arc::new(schema),
1736            vec![
1737                Arc::new(string_view_values),
1738                Arc::new(binary_view_values),
1739                Arc::new(nullable_string_view_values),
1740            ],
1741        )
1742        .unwrap();
1743
1744        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1745        roundtrip(batch, None);
1746    }
1747
1748    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1749        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1750        let schema = Schema::new(vec![decimal_field]);
1751
1752        let decimal_values = vec![10_000, 50_000, 0, -100]
1753            .into_iter()
1754            .map(Some)
1755            .collect::<Decimal128Array>()
1756            .with_precision_and_scale(precision, scale)
1757            .unwrap();
1758
1759        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1760    }
1761
1762    #[test]
1763    fn arrow_writer_decimal() {
1764        // int32 to store the decimal value
1765        let batch_int32_decimal = get_decimal_batch(5, 2);
1766        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1767        // int64 to store the decimal value
1768        let batch_int64_decimal = get_decimal_batch(12, 2);
1769        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1770        // fixed_length_byte_array to store the decimal value
1771        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1772        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1773    }
1774
1775    #[test]
1776    fn arrow_writer_complex() {
1777        // define schema
1778        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1779        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1780        let struct_field_g = Arc::new(Field::new_list(
1781            "g",
1782            Field::new_list_field(DataType::Int16, true),
1783            false,
1784        ));
1785        let struct_field_h = Arc::new(Field::new_list(
1786            "h",
1787            Field::new_list_field(DataType::Int16, false),
1788            true,
1789        ));
1790        let struct_field_e = Arc::new(Field::new_struct(
1791            "e",
1792            vec![
1793                struct_field_f.clone(),
1794                struct_field_g.clone(),
1795                struct_field_h.clone(),
1796            ],
1797            false,
1798        ));
1799        let schema = Schema::new(vec![
1800            Field::new("a", DataType::Int32, false),
1801            Field::new("b", DataType::Int32, true),
1802            Field::new_struct(
1803                "c",
1804                vec![struct_field_d.clone(), struct_field_e.clone()],
1805                false,
1806            ),
1807        ]);
1808
1809        // create some data
1810        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1811        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1812        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1813        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1814
1815        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1816
1817        // Construct a buffer for value offsets, for the nested array:
1818        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1819        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1820
1821        // Construct a list array from the above two
1822        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1823            .len(5)
1824            .add_buffer(g_value_offsets.clone())
1825            .add_child_data(g_value.to_data())
1826            .build()
1827            .unwrap();
1828        let g = ListArray::from(g_list_data);
1829        // The difference between g and h is that h has a null bitmap
1830        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1831            .len(5)
1832            .add_buffer(g_value_offsets)
1833            .add_child_data(g_value.to_data())
1834            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1835            .build()
1836            .unwrap();
1837        let h = ListArray::from(h_list_data);
1838
1839        let e = StructArray::from(vec![
1840            (struct_field_f, Arc::new(f) as ArrayRef),
1841            (struct_field_g, Arc::new(g) as ArrayRef),
1842            (struct_field_h, Arc::new(h) as ArrayRef),
1843        ]);
1844
1845        let c = StructArray::from(vec![
1846            (struct_field_d, Arc::new(d) as ArrayRef),
1847            (struct_field_e, Arc::new(e) as ArrayRef),
1848        ]);
1849
1850        // build a record batch
1851        let batch = RecordBatch::try_new(
1852            Arc::new(schema),
1853            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1854        )
1855        .unwrap();
1856
1857        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1858        roundtrip(batch, Some(SMALL_SIZE / 3));
1859    }
1860
1861    #[test]
1862    fn arrow_writer_complex_mixed() {
1863        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1864        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1865
1866        // define schema
1867        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1868        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1869        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1870        let schema = Schema::new(vec![Field::new(
1871            "some_nested_object",
1872            DataType::Struct(Fields::from(vec![
1873                offset_field.clone(),
1874                partition_field.clone(),
1875                topic_field.clone(),
1876            ])),
1877            false,
1878        )]);
1879
1880        // create some data
1881        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1882        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1883        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1884
1885        let some_nested_object = StructArray::from(vec![
1886            (offset_field, Arc::new(offset) as ArrayRef),
1887            (partition_field, Arc::new(partition) as ArrayRef),
1888            (topic_field, Arc::new(topic) as ArrayRef),
1889        ]);
1890
1891        // build a record batch
1892        let batch =
1893            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1894
1895        roundtrip(batch, Some(SMALL_SIZE / 2));
1896    }
1897
1898    #[test]
1899    fn arrow_writer_map() {
1900        // Note: we are using the JSON Arrow reader for brevity
1901        let json_content = r#"
1902        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1903        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1904        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1905        "#;
1906        let entries_struct_type = DataType::Struct(Fields::from(vec![
1907            Field::new("key", DataType::Utf8, false),
1908            Field::new("value", DataType::Utf8, true),
1909        ]));
1910        let stocks_field = Field::new(
1911            "stocks",
1912            DataType::Map(
1913                Arc::new(Field::new("entries", entries_struct_type, false)),
1914                false,
1915            ),
1916            true,
1917        );
1918        let schema = Arc::new(Schema::new(vec![stocks_field]));
1919        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1920        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1921
1922        let batch = reader.next().unwrap().unwrap();
1923        roundtrip(batch, None);
1924    }
1925
1926    #[test]
1927    fn arrow_writer_2_level_struct() {
1928        // tests writing <struct<struct<primitive>>
1929        let field_c = Field::new("c", DataType::Int32, true);
1930        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1931        let type_a = DataType::Struct(vec![field_b.clone()].into());
1932        let field_a = Field::new("a", type_a, true);
1933        let schema = Schema::new(vec![field_a.clone()]);
1934
1935        // create data
1936        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1937        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1938            .len(6)
1939            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1940            .add_child_data(c.into_data())
1941            .build()
1942            .unwrap();
1943        let b = StructArray::from(b_data);
1944        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1945            .len(6)
1946            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1947            .add_child_data(b.into_data())
1948            .build()
1949            .unwrap();
1950        let a = StructArray::from(a_data);
1951
1952        assert_eq!(a.null_count(), 1);
1953        assert_eq!(a.column(0).null_count(), 2);
1954
1955        // build a racord batch
1956        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1957
1958        roundtrip(batch, Some(SMALL_SIZE / 2));
1959    }
1960
1961    #[test]
1962    fn arrow_writer_2_level_struct_non_null() {
1963        // tests writing <struct<struct<primitive>>
1964        let field_c = Field::new("c", DataType::Int32, false);
1965        let type_b = DataType::Struct(vec![field_c].into());
1966        let field_b = Field::new("b", type_b.clone(), false);
1967        let type_a = DataType::Struct(vec![field_b].into());
1968        let field_a = Field::new("a", type_a.clone(), false);
1969        let schema = Schema::new(vec![field_a]);
1970
1971        // create data
1972        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1973        let b_data = ArrayDataBuilder::new(type_b)
1974            .len(6)
1975            .add_child_data(c.into_data())
1976            .build()
1977            .unwrap();
1978        let b = StructArray::from(b_data);
1979        let a_data = ArrayDataBuilder::new(type_a)
1980            .len(6)
1981            .add_child_data(b.into_data())
1982            .build()
1983            .unwrap();
1984        let a = StructArray::from(a_data);
1985
1986        assert_eq!(a.null_count(), 0);
1987        assert_eq!(a.column(0).null_count(), 0);
1988
1989        // build a racord batch
1990        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1991
1992        roundtrip(batch, Some(SMALL_SIZE / 2));
1993    }
1994
1995    #[test]
1996    fn arrow_writer_2_level_struct_mixed_null() {
1997        // tests writing <struct<struct<primitive>>
1998        let field_c = Field::new("c", DataType::Int32, false);
1999        let type_b = DataType::Struct(vec![field_c].into());
2000        let field_b = Field::new("b", type_b.clone(), true);
2001        let type_a = DataType::Struct(vec![field_b].into());
2002        let field_a = Field::new("a", type_a.clone(), false);
2003        let schema = Schema::new(vec![field_a]);
2004
2005        // create data
2006        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2007        let b_data = ArrayDataBuilder::new(type_b)
2008            .len(6)
2009            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2010            .add_child_data(c.into_data())
2011            .build()
2012            .unwrap();
2013        let b = StructArray::from(b_data);
2014        // a intentionally has no null buffer, to test that this is handled correctly
2015        let a_data = ArrayDataBuilder::new(type_a)
2016            .len(6)
2017            .add_child_data(b.into_data())
2018            .build()
2019            .unwrap();
2020        let a = StructArray::from(a_data);
2021
2022        assert_eq!(a.null_count(), 0);
2023        assert_eq!(a.column(0).null_count(), 2);
2024
2025        // build a racord batch
2026        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2027
2028        roundtrip(batch, Some(SMALL_SIZE / 2));
2029    }
2030
2031    #[test]
2032    fn arrow_writer_2_level_struct_mixed_null_2() {
2033        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2034        let field_c = Field::new("c", DataType::Int32, false);
2035        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2036        let field_e = Field::new(
2037            "e",
2038            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2039            false,
2040        );
2041
2042        let field_b = Field::new(
2043            "b",
2044            DataType::Struct(vec![field_c, field_d, field_e].into()),
2045            false,
2046        );
2047        let type_a = DataType::Struct(vec![field_b.clone()].into());
2048        let field_a = Field::new("a", type_a, true);
2049        let schema = Schema::new(vec![field_a.clone()]);
2050
2051        // create data
2052        let c = Int32Array::from_iter_values(0..6);
2053        let d = FixedSizeBinaryArray::try_from_iter(
2054            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2055        )
2056        .expect("four byte values");
2057        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2058        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2059            .len(6)
2060            .add_child_data(c.into_data())
2061            .add_child_data(d.into_data())
2062            .add_child_data(e.into_data())
2063            .build()
2064            .unwrap();
2065        let b = StructArray::from(b_data);
2066        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2067            .len(6)
2068            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2069            .add_child_data(b.into_data())
2070            .build()
2071            .unwrap();
2072        let a = StructArray::from(a_data);
2073
2074        assert_eq!(a.null_count(), 3);
2075        assert_eq!(a.column(0).null_count(), 0);
2076
2077        // build a record batch
2078        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2079
2080        roundtrip(batch, Some(SMALL_SIZE / 2));
2081    }
2082
2083    #[test]
2084    fn test_fixed_size_binary_in_dict() {
2085        fn test_fixed_size_binary_in_dict_inner<K>()
2086        where
2087            K: ArrowDictionaryKeyType,
2088            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2089            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2090        {
2091            let field = Field::new(
2092                "a",
2093                DataType::Dictionary(
2094                    Box::new(K::DATA_TYPE),
2095                    Box::new(DataType::FixedSizeBinary(4)),
2096                ),
2097                false,
2098            );
2099            let schema = Schema::new(vec![field]);
2100
2101            let keys: Vec<K::Native> = vec![
2102                K::Native::try_from(0u8).unwrap(),
2103                K::Native::try_from(0u8).unwrap(),
2104                K::Native::try_from(1u8).unwrap(),
2105            ];
2106            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2107            let values = FixedSizeBinaryArray::try_from_iter(
2108                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2109            )
2110            .unwrap();
2111
2112            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2113            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2114            roundtrip(batch, None);
2115        }
2116
2117        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2118        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2119        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2120        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2121        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2122        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2123        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2124        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2125    }
2126
2127    #[test]
2128    fn test_empty_dict() {
2129        let struct_fields = Fields::from(vec![Field::new(
2130            "dict",
2131            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2132            false,
2133        )]);
2134
2135        let schema = Schema::new(vec![Field::new_struct(
2136            "struct",
2137            struct_fields.clone(),
2138            true,
2139        )]);
2140        let dictionary = Arc::new(DictionaryArray::new(
2141            Int32Array::new_null(5),
2142            Arc::new(StringArray::new_null(0)),
2143        ));
2144
2145        let s = StructArray::new(
2146            struct_fields,
2147            vec![dictionary],
2148            Some(NullBuffer::new_null(5)),
2149        );
2150
2151        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2152        roundtrip(batch, None);
2153    }
2154    #[test]
2155    fn arrow_writer_page_size() {
2156        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2157
2158        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2159
2160        // Generate an array of 10 unique 10 character string
2161        for i in 0..10 {
2162            let value = i
2163                .to_string()
2164                .repeat(10)
2165                .chars()
2166                .take(10)
2167                .collect::<String>();
2168
2169            builder.append_value(value);
2170        }
2171
2172        let array = Arc::new(builder.finish());
2173
2174        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2175
2176        let file = tempfile::tempfile().unwrap();
2177
2178        // Set everything very low so we fallback to PLAIN encoding after the first row
2179        let props = WriterProperties::builder()
2180            .set_data_page_size_limit(1)
2181            .set_dictionary_page_size_limit(1)
2182            .set_write_batch_size(1)
2183            .build();
2184
2185        let mut writer =
2186            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2187                .expect("Unable to write file");
2188        writer.write(&batch).unwrap();
2189        writer.close().unwrap();
2190
2191        let options = ReadOptionsBuilder::new().with_page_index().build();
2192        let reader =
2193            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2194
2195        let column = reader.metadata().row_group(0).columns();
2196
2197        assert_eq!(column.len(), 1);
2198
2199        // We should write one row before falling back to PLAIN encoding so there should still be a
2200        // dictionary page.
2201        assert!(
2202            column[0].dictionary_page_offset().is_some(),
2203            "Expected a dictionary page"
2204        );
2205
2206        assert!(reader.metadata().offset_index().is_some());
2207        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2208
2209        let page_locations = offset_indexes[0].page_locations.clone();
2210
2211        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2212        // so we expect one dictionary encoded page and then a page per row thereafter.
2213        assert_eq!(
2214            page_locations.len(),
2215            10,
2216            "Expected 10 pages but got {page_locations:#?}"
2217        );
2218    }
2219
2220    #[test]
2221    fn arrow_writer_float_nans() {
2222        let f16_field = Field::new("a", DataType::Float16, false);
2223        let f32_field = Field::new("b", DataType::Float32, false);
2224        let f64_field = Field::new("c", DataType::Float64, false);
2225        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2226
2227        let f16_values = (0..MEDIUM_SIZE)
2228            .map(|i| {
2229                Some(if i % 2 == 0 {
2230                    f16::NAN
2231                } else {
2232                    f16::from_f32(i as f32)
2233                })
2234            })
2235            .collect::<Float16Array>();
2236
2237        let f32_values = (0..MEDIUM_SIZE)
2238            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2239            .collect::<Float32Array>();
2240
2241        let f64_values = (0..MEDIUM_SIZE)
2242            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2243            .collect::<Float64Array>();
2244
2245        let batch = RecordBatch::try_new(
2246            Arc::new(schema),
2247            vec![
2248                Arc::new(f16_values),
2249                Arc::new(f32_values),
2250                Arc::new(f64_values),
2251            ],
2252        )
2253        .unwrap();
2254
2255        roundtrip(batch, None);
2256    }
2257
2258    const SMALL_SIZE: usize = 7;
2259    const MEDIUM_SIZE: usize = 63;
2260
2261    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2262        let mut files = vec![];
2263        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2264            let mut props = WriterProperties::builder().set_writer_version(version);
2265
2266            if let Some(size) = max_row_group_size {
2267                props = props.set_max_row_group_size(size)
2268            }
2269
2270            let props = props.build();
2271            files.push(roundtrip_opts(&expected_batch, props))
2272        }
2273        files
2274    }
2275
2276    fn roundtrip_opts_with_array_validation<F>(
2277        expected_batch: &RecordBatch,
2278        props: WriterProperties,
2279        validate: F,
2280    ) -> File
2281    where
2282        F: Fn(&ArrayData, &ArrayData),
2283    {
2284        let file = tempfile::tempfile().unwrap();
2285
2286        let mut writer = ArrowWriter::try_new(
2287            file.try_clone().unwrap(),
2288            expected_batch.schema(),
2289            Some(props),
2290        )
2291        .expect("Unable to write file");
2292        writer.write(expected_batch).unwrap();
2293        writer.close().unwrap();
2294
2295        let mut record_batch_reader =
2296            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2297
2298        let actual_batch = record_batch_reader
2299            .next()
2300            .expect("No batch found")
2301            .expect("Unable to get batch");
2302
2303        assert_eq!(expected_batch.schema(), actual_batch.schema());
2304        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2305        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2306        for i in 0..expected_batch.num_columns() {
2307            let expected_data = expected_batch.column(i).to_data();
2308            let actual_data = actual_batch.column(i).to_data();
2309            validate(&expected_data, &actual_data);
2310        }
2311
2312        file
2313    }
2314
2315    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2316        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2317            a.validate_full().expect("valid expected data");
2318            b.validate_full().expect("valid actual data");
2319            assert_eq!(a, b)
2320        })
2321    }
2322
2323    struct RoundTripOptions {
2324        values: ArrayRef,
2325        schema: SchemaRef,
2326        bloom_filter: bool,
2327        bloom_filter_position: BloomFilterPosition,
2328    }
2329
2330    impl RoundTripOptions {
2331        fn new(values: ArrayRef, nullable: bool) -> Self {
2332            let data_type = values.data_type().clone();
2333            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2334            Self {
2335                values,
2336                schema: Arc::new(schema),
2337                bloom_filter: false,
2338                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2339            }
2340        }
2341    }
2342
2343    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2344        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2345    }
2346
2347    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2348        let mut options = RoundTripOptions::new(values, false);
2349        options.schema = schema;
2350        one_column_roundtrip_with_options(options)
2351    }
2352
2353    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2354        let RoundTripOptions {
2355            values,
2356            schema,
2357            bloom_filter,
2358            bloom_filter_position,
2359        } = options;
2360
2361        let encodings = match values.data_type() {
2362            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2363                vec![
2364                    Encoding::PLAIN,
2365                    Encoding::DELTA_BYTE_ARRAY,
2366                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2367                ]
2368            }
2369            DataType::Int64
2370            | DataType::Int32
2371            | DataType::Int16
2372            | DataType::Int8
2373            | DataType::UInt64
2374            | DataType::UInt32
2375            | DataType::UInt16
2376            | DataType::UInt8 => vec![
2377                Encoding::PLAIN,
2378                Encoding::DELTA_BINARY_PACKED,
2379                Encoding::BYTE_STREAM_SPLIT,
2380            ],
2381            DataType::Float32 | DataType::Float64 => {
2382                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2383            }
2384            _ => vec![Encoding::PLAIN],
2385        };
2386
2387        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2388
2389        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2390
2391        let mut files = vec![];
2392        for dictionary_size in [0, 1, 1024] {
2393            for encoding in &encodings {
2394                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2395                    for row_group_size in row_group_sizes {
2396                        let props = WriterProperties::builder()
2397                            .set_writer_version(version)
2398                            .set_max_row_group_size(row_group_size)
2399                            .set_dictionary_enabled(dictionary_size != 0)
2400                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2401                            .set_encoding(*encoding)
2402                            .set_bloom_filter_enabled(bloom_filter)
2403                            .set_bloom_filter_position(bloom_filter_position)
2404                            .build();
2405
2406                        files.push(roundtrip_opts(&expected_batch, props))
2407                    }
2408                }
2409            }
2410        }
2411        files
2412    }
2413
2414    fn values_required<A, I>(iter: I) -> Vec<File>
2415    where
2416        A: From<Vec<I::Item>> + Array + 'static,
2417        I: IntoIterator,
2418    {
2419        let raw_values: Vec<_> = iter.into_iter().collect();
2420        let values = Arc::new(A::from(raw_values));
2421        one_column_roundtrip(values, false)
2422    }
2423
2424    fn values_optional<A, I>(iter: I) -> Vec<File>
2425    where
2426        A: From<Vec<Option<I::Item>>> + Array + 'static,
2427        I: IntoIterator,
2428    {
2429        let optional_raw_values: Vec<_> = iter
2430            .into_iter()
2431            .enumerate()
2432            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2433            .collect();
2434        let optional_values = Arc::new(A::from(optional_raw_values));
2435        one_column_roundtrip(optional_values, true)
2436    }
2437
2438    fn required_and_optional<A, I>(iter: I)
2439    where
2440        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2441        I: IntoIterator + Clone,
2442    {
2443        values_required::<A, I>(iter.clone());
2444        values_optional::<A, I>(iter);
2445    }
2446
2447    fn check_bloom_filter<T: AsBytes>(
2448        files: Vec<File>,
2449        file_column: String,
2450        positive_values: Vec<T>,
2451        negative_values: Vec<T>,
2452    ) {
2453        files.into_iter().take(1).for_each(|file| {
2454            let file_reader = SerializedFileReader::new_with_options(
2455                file,
2456                ReadOptionsBuilder::new()
2457                    .with_reader_properties(
2458                        ReaderProperties::builder()
2459                            .set_read_bloom_filter(true)
2460                            .build(),
2461                    )
2462                    .build(),
2463            )
2464            .expect("Unable to open file as Parquet");
2465            let metadata = file_reader.metadata();
2466
2467            // Gets bloom filters from all row groups.
2468            let mut bloom_filters: Vec<_> = vec![];
2469            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2470                if let Some((column_index, _)) = row_group
2471                    .columns()
2472                    .iter()
2473                    .enumerate()
2474                    .find(|(_, column)| column.column_path().string() == file_column)
2475                {
2476                    let row_group_reader = file_reader
2477                        .get_row_group(ri)
2478                        .expect("Unable to read row group");
2479                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2480                        bloom_filters.push(sbbf.clone());
2481                    } else {
2482                        panic!("No bloom filter for column named {file_column} found");
2483                    }
2484                } else {
2485                    panic!("No column named {file_column} found");
2486                }
2487            }
2488
2489            positive_values.iter().for_each(|value| {
2490                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2491                assert!(
2492                    found.is_some(),
2493                    "{}",
2494                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2495                );
2496            });
2497
2498            negative_values.iter().for_each(|value| {
2499                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2500                assert!(
2501                    found.is_none(),
2502                    "{}",
2503                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2504                );
2505            });
2506        });
2507    }
2508
2509    #[test]
2510    fn all_null_primitive_single_column() {
2511        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2512        one_column_roundtrip(values, true);
2513    }
2514    #[test]
2515    fn null_single_column() {
2516        let values = Arc::new(NullArray::new(SMALL_SIZE));
2517        one_column_roundtrip(values, true);
2518        // null arrays are always nullable, a test with non-nullable nulls fails
2519    }
2520
2521    #[test]
2522    fn bool_single_column() {
2523        required_and_optional::<BooleanArray, _>(
2524            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2525        );
2526    }
2527
2528    #[test]
2529    fn bool_large_single_column() {
2530        let values = Arc::new(
2531            [None, Some(true), Some(false)]
2532                .iter()
2533                .cycle()
2534                .copied()
2535                .take(200_000)
2536                .collect::<BooleanArray>(),
2537        );
2538        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2539        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2540        let file = tempfile::tempfile().unwrap();
2541
2542        let mut writer =
2543            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2544                .expect("Unable to write file");
2545        writer.write(&expected_batch).unwrap();
2546        writer.close().unwrap();
2547    }
2548
2549    #[test]
2550    fn check_page_offset_index_with_nan() {
2551        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2552        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2553        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2554
2555        let mut out = Vec::with_capacity(1024);
2556        let mut writer =
2557            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2558        writer.write(&batch).unwrap();
2559        let file_meta_data = writer.close().unwrap();
2560        for row_group in file_meta_data.row_groups {
2561            for column in row_group.columns {
2562                assert!(column.offset_index_offset.is_some());
2563                assert!(column.offset_index_length.is_some());
2564                assert!(column.column_index_offset.is_none());
2565                assert!(column.column_index_length.is_none());
2566            }
2567        }
2568    }
2569
2570    #[test]
2571    fn i8_single_column() {
2572        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2573    }
2574
2575    #[test]
2576    fn i16_single_column() {
2577        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2578    }
2579
2580    #[test]
2581    fn i32_single_column() {
2582        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2583    }
2584
2585    #[test]
2586    fn i64_single_column() {
2587        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2588    }
2589
2590    #[test]
2591    fn u8_single_column() {
2592        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2593    }
2594
2595    #[test]
2596    fn u16_single_column() {
2597        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2598    }
2599
2600    #[test]
2601    fn u32_single_column() {
2602        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2603    }
2604
2605    #[test]
2606    fn u64_single_column() {
2607        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2608    }
2609
2610    #[test]
2611    fn f32_single_column() {
2612        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2613    }
2614
2615    #[test]
2616    fn f64_single_column() {
2617        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2618    }
2619
2620    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2621    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2622    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2623
2624    #[test]
2625    fn timestamp_second_single_column() {
2626        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2627        let values = Arc::new(TimestampSecondArray::from(raw_values));
2628
2629        one_column_roundtrip(values, false);
2630    }
2631
2632    #[test]
2633    fn timestamp_millisecond_single_column() {
2634        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2635        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2636
2637        one_column_roundtrip(values, false);
2638    }
2639
2640    #[test]
2641    fn timestamp_microsecond_single_column() {
2642        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2643        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2644
2645        one_column_roundtrip(values, false);
2646    }
2647
2648    #[test]
2649    fn timestamp_nanosecond_single_column() {
2650        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2651        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2652
2653        one_column_roundtrip(values, false);
2654    }
2655
2656    #[test]
2657    fn date32_single_column() {
2658        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2659    }
2660
2661    #[test]
2662    fn date64_single_column() {
2663        // Date64 must be a multiple of 86400000, see ARROW-10925
2664        required_and_optional::<Date64Array, _>(
2665            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2666        );
2667    }
2668
2669    #[test]
2670    fn time32_second_single_column() {
2671        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2672    }
2673
2674    #[test]
2675    fn time32_millisecond_single_column() {
2676        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2677    }
2678
2679    #[test]
2680    fn time64_microsecond_single_column() {
2681        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2682    }
2683
2684    #[test]
2685    fn time64_nanosecond_single_column() {
2686        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2687    }
2688
2689    #[test]
2690    fn duration_second_single_column() {
2691        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2692    }
2693
2694    #[test]
2695    fn duration_millisecond_single_column() {
2696        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2697    }
2698
2699    #[test]
2700    fn duration_microsecond_single_column() {
2701        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2702    }
2703
2704    #[test]
2705    fn duration_nanosecond_single_column() {
2706        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2707    }
2708
2709    #[test]
2710    fn interval_year_month_single_column() {
2711        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2712    }
2713
2714    #[test]
2715    fn interval_day_time_single_column() {
2716        required_and_optional::<IntervalDayTimeArray, _>(vec![
2717            IntervalDayTime::new(0, 1),
2718            IntervalDayTime::new(0, 3),
2719            IntervalDayTime::new(3, -2),
2720            IntervalDayTime::new(-200, 4),
2721        ]);
2722    }
2723
2724    #[test]
2725    #[should_panic(
2726        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2727    )]
2728    fn interval_month_day_nano_single_column() {
2729        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2730            IntervalMonthDayNano::new(0, 1, 5),
2731            IntervalMonthDayNano::new(0, 3, 2),
2732            IntervalMonthDayNano::new(3, -2, -5),
2733            IntervalMonthDayNano::new(-200, 4, -1),
2734        ]);
2735    }
2736
2737    #[test]
2738    fn binary_single_column() {
2739        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2740        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2741        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2742
2743        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2744        values_required::<BinaryArray, _>(many_vecs_iter);
2745    }
2746
2747    #[test]
2748    fn binary_view_single_column() {
2749        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2750        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2751        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2752
2753        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2754        values_required::<BinaryViewArray, _>(many_vecs_iter);
2755    }
2756
2757    #[test]
2758    fn i32_column_bloom_filter_at_end() {
2759        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2760        let mut options = RoundTripOptions::new(array, false);
2761        options.bloom_filter = true;
2762        options.bloom_filter_position = BloomFilterPosition::End;
2763
2764        let files = one_column_roundtrip_with_options(options);
2765        check_bloom_filter(
2766            files,
2767            "col".to_string(),
2768            (0..SMALL_SIZE as i32).collect(),
2769            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2770        );
2771    }
2772
2773    #[test]
2774    fn i32_column_bloom_filter() {
2775        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2776        let mut options = RoundTripOptions::new(array, false);
2777        options.bloom_filter = true;
2778
2779        let files = one_column_roundtrip_with_options(options);
2780        check_bloom_filter(
2781            files,
2782            "col".to_string(),
2783            (0..SMALL_SIZE as i32).collect(),
2784            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2785        );
2786    }
2787
2788    #[test]
2789    fn binary_column_bloom_filter() {
2790        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2791        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2792        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2793
2794        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2795        let mut options = RoundTripOptions::new(array, false);
2796        options.bloom_filter = true;
2797
2798        let files = one_column_roundtrip_with_options(options);
2799        check_bloom_filter(
2800            files,
2801            "col".to_string(),
2802            many_vecs,
2803            vec![vec![(SMALL_SIZE + 1) as u8]],
2804        );
2805    }
2806
2807    #[test]
2808    fn empty_string_null_column_bloom_filter() {
2809        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2810        let raw_strs = raw_values.iter().map(|s| s.as_str());
2811
2812        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2813        let mut options = RoundTripOptions::new(array, false);
2814        options.bloom_filter = true;
2815
2816        let files = one_column_roundtrip_with_options(options);
2817
2818        let optional_raw_values: Vec<_> = raw_values
2819            .iter()
2820            .enumerate()
2821            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2822            .collect();
2823        // For null slots, empty string should not be in bloom filter.
2824        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2825    }
2826
2827    #[test]
2828    fn large_binary_single_column() {
2829        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2830        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2831        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2832
2833        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2834        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2835    }
2836
2837    #[test]
2838    fn fixed_size_binary_single_column() {
2839        let mut builder = FixedSizeBinaryBuilder::new(4);
2840        builder.append_value(b"0123").unwrap();
2841        builder.append_null();
2842        builder.append_value(b"8910").unwrap();
2843        builder.append_value(b"1112").unwrap();
2844        let array = Arc::new(builder.finish());
2845
2846        one_column_roundtrip(array, true);
2847    }
2848
2849    #[test]
2850    fn string_single_column() {
2851        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2852        let raw_strs = raw_values.iter().map(|s| s.as_str());
2853
2854        required_and_optional::<StringArray, _>(raw_strs);
2855    }
2856
2857    #[test]
2858    fn large_string_single_column() {
2859        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2860        let raw_strs = raw_values.iter().map(|s| s.as_str());
2861
2862        required_and_optional::<LargeStringArray, _>(raw_strs);
2863    }
2864
2865    #[test]
2866    fn string_view_single_column() {
2867        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2868        let raw_strs = raw_values.iter().map(|s| s.as_str());
2869
2870        required_and_optional::<StringViewArray, _>(raw_strs);
2871    }
2872
2873    #[test]
2874    fn null_list_single_column() {
2875        let null_field = Field::new_list_field(DataType::Null, true);
2876        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2877
2878        let schema = Schema::new(vec![list_field]);
2879
2880        // Build [[], null, [null, null]]
2881        let a_values = NullArray::new(2);
2882        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2883        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2884            DataType::Null,
2885            true,
2886        ))))
2887        .len(3)
2888        .add_buffer(a_value_offsets)
2889        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2890        .add_child_data(a_values.into_data())
2891        .build()
2892        .unwrap();
2893
2894        let a = ListArray::from(a_list_data);
2895
2896        assert!(a.is_valid(0));
2897        assert!(!a.is_valid(1));
2898        assert!(a.is_valid(2));
2899
2900        assert_eq!(a.value(0).len(), 0);
2901        assert_eq!(a.value(2).len(), 2);
2902        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2903
2904        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2905        roundtrip(batch, None);
2906    }
2907
2908    #[test]
2909    fn list_single_column() {
2910        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2911        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2912        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2913            DataType::Int32,
2914            false,
2915        ))))
2916        .len(5)
2917        .add_buffer(a_value_offsets)
2918        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2919        .add_child_data(a_values.into_data())
2920        .build()
2921        .unwrap();
2922
2923        assert_eq!(a_list_data.null_count(), 1);
2924
2925        let a = ListArray::from(a_list_data);
2926        let values = Arc::new(a);
2927
2928        one_column_roundtrip(values, true);
2929    }
2930
2931    #[test]
2932    fn large_list_single_column() {
2933        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2934        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2935        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2936            "large_item",
2937            DataType::Int32,
2938            true,
2939        ))))
2940        .len(5)
2941        .add_buffer(a_value_offsets)
2942        .add_child_data(a_values.into_data())
2943        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2944        .build()
2945        .unwrap();
2946
2947        // I think this setup is incorrect because this should pass
2948        assert_eq!(a_list_data.null_count(), 1);
2949
2950        let a = LargeListArray::from(a_list_data);
2951        let values = Arc::new(a);
2952
2953        one_column_roundtrip(values, true);
2954    }
2955
2956    #[test]
2957    fn list_nested_nulls() {
2958        use arrow::datatypes::Int32Type;
2959        let data = vec![
2960            Some(vec![Some(1)]),
2961            Some(vec![Some(2), Some(3)]),
2962            None,
2963            Some(vec![Some(4), Some(5), None]),
2964            Some(vec![None]),
2965            Some(vec![Some(6), Some(7)]),
2966        ];
2967
2968        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2969        one_column_roundtrip(Arc::new(list), true);
2970
2971        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2972        one_column_roundtrip(Arc::new(list), true);
2973    }
2974
2975    #[test]
2976    fn struct_single_column() {
2977        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2978        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2979        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2980
2981        let values = Arc::new(s);
2982        one_column_roundtrip(values, false);
2983    }
2984
2985    #[test]
2986    fn list_and_map_coerced_names() {
2987        // Create map and list with non-Parquet naming
2988        let list_field =
2989            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2990        let map_field = Field::new_map(
2991            "my_map",
2992            "entries",
2993            Field::new("keys", DataType::Int32, false),
2994            Field::new("values", DataType::Int32, true),
2995            false,
2996            true,
2997        );
2998
2999        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3000        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3001
3002        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3003
3004        // Write data to Parquet but coerce names to match spec
3005        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3006        let file = tempfile::tempfile().unwrap();
3007        let mut writer =
3008            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3009
3010        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3011        writer.write(&batch).unwrap();
3012        let file_metadata = writer.close().unwrap();
3013
3014        // Coerced name of "item" should be "element"
3015        assert_eq!(file_metadata.schema[3].name, "element");
3016        // Coerced name of "entries" should be "key_value"
3017        assert_eq!(file_metadata.schema[5].name, "key_value");
3018        // Coerced name of "keys" should be "key"
3019        assert_eq!(file_metadata.schema[6].name, "key");
3020        // Coerced name of "values" should be "value"
3021        assert_eq!(file_metadata.schema[7].name, "value");
3022
3023        // Double check schema after reading from the file
3024        let reader = SerializedFileReader::new(file).unwrap();
3025        let file_schema = reader.metadata().file_metadata().schema();
3026        let fields = file_schema.get_fields();
3027        let list_field = &fields[0].get_fields()[0];
3028        assert_eq!(list_field.get_fields()[0].name(), "element");
3029        let map_field = &fields[1].get_fields()[0];
3030        assert_eq!(map_field.name(), "key_value");
3031        assert_eq!(map_field.get_fields()[0].name(), "key");
3032        assert_eq!(map_field.get_fields()[1].name(), "value");
3033    }
3034
3035    #[test]
3036    fn fallback_flush_data_page() {
3037        //tests if the Fallback::flush_data_page clears all buffers correctly
3038        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3039        let values = Arc::new(StringArray::from(raw_values));
3040        let encodings = vec![
3041            Encoding::DELTA_BYTE_ARRAY,
3042            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3043        ];
3044        let data_type = values.data_type().clone();
3045        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3046        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3047
3048        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3049        let data_page_size_limit: usize = 32;
3050        let write_batch_size: usize = 16;
3051
3052        for encoding in &encodings {
3053            for row_group_size in row_group_sizes {
3054                let props = WriterProperties::builder()
3055                    .set_writer_version(WriterVersion::PARQUET_2_0)
3056                    .set_max_row_group_size(row_group_size)
3057                    .set_dictionary_enabled(false)
3058                    .set_encoding(*encoding)
3059                    .set_data_page_size_limit(data_page_size_limit)
3060                    .set_write_batch_size(write_batch_size)
3061                    .build();
3062
3063                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3064                    let string_array_a = StringArray::from(a.clone());
3065                    let string_array_b = StringArray::from(b.clone());
3066                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3067                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3068                    assert_eq!(
3069                        vec_a, vec_b,
3070                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3071                    );
3072                });
3073            }
3074        }
3075    }
3076
3077    #[test]
3078    fn arrow_writer_string_dictionary() {
3079        // define schema
3080        #[allow(deprecated)]
3081        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3082            "dictionary",
3083            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3084            true,
3085            42,
3086            true,
3087        )]));
3088
3089        // create some data
3090        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3091            .iter()
3092            .copied()
3093            .collect();
3094
3095        // build a record batch
3096        one_column_roundtrip_with_schema(Arc::new(d), schema);
3097    }
3098
3099    #[test]
3100    fn arrow_writer_test_type_compatibility() {
3101        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3102        where
3103            T1: Array + 'static,
3104            T2: Array + 'static,
3105        {
3106            let schema1 = Arc::new(Schema::new(vec![Field::new(
3107                "a",
3108                array1.data_type().clone(),
3109                false,
3110            )]));
3111
3112            let file = tempfile().unwrap();
3113            let mut writer =
3114                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3115
3116            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3117            writer.write(&rb1).unwrap();
3118
3119            let schema2 = Arc::new(Schema::new(vec![Field::new(
3120                "a",
3121                array2.data_type().clone(),
3122                false,
3123            )]));
3124            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3125            writer.write(&rb2).unwrap();
3126
3127            writer.close().unwrap();
3128
3129            let mut record_batch_reader =
3130                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3131            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3132
3133            let expected_batch =
3134                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3135            assert_eq!(actual_batch, expected_batch);
3136        }
3137
3138        // check compatibility between native and dictionaries
3139
3140        ensure_compatible_write(
3141            DictionaryArray::new(
3142                UInt8Array::from_iter_values(vec![0]),
3143                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3144            ),
3145            StringArray::from_iter_values(vec!["barquet"]),
3146            DictionaryArray::new(
3147                UInt8Array::from_iter_values(vec![0, 1]),
3148                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3149            ),
3150        );
3151
3152        ensure_compatible_write(
3153            StringArray::from_iter_values(vec!["parquet"]),
3154            DictionaryArray::new(
3155                UInt8Array::from_iter_values(vec![0]),
3156                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3157            ),
3158            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3159        );
3160
3161        // check compatibility between dictionaries with different key types
3162
3163        ensure_compatible_write(
3164            DictionaryArray::new(
3165                UInt8Array::from_iter_values(vec![0]),
3166                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3167            ),
3168            DictionaryArray::new(
3169                UInt16Array::from_iter_values(vec![0]),
3170                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3171            ),
3172            DictionaryArray::new(
3173                UInt8Array::from_iter_values(vec![0, 1]),
3174                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3175            ),
3176        );
3177
3178        // check compatibility between dictionaries with different value types
3179        ensure_compatible_write(
3180            DictionaryArray::new(
3181                UInt8Array::from_iter_values(vec![0]),
3182                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3183            ),
3184            DictionaryArray::new(
3185                UInt8Array::from_iter_values(vec![0]),
3186                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3187            ),
3188            DictionaryArray::new(
3189                UInt8Array::from_iter_values(vec![0, 1]),
3190                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3191            ),
3192        );
3193
3194        // check compatibility between a dictionary and a native array with a different type
3195        ensure_compatible_write(
3196            DictionaryArray::new(
3197                UInt8Array::from_iter_values(vec![0]),
3198                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3199            ),
3200            LargeStringArray::from_iter_values(vec!["barquet"]),
3201            DictionaryArray::new(
3202                UInt8Array::from_iter_values(vec![0, 1]),
3203                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3204            ),
3205        );
3206
3207        // check compatibility for string types
3208
3209        ensure_compatible_write(
3210            StringArray::from_iter_values(vec!["parquet"]),
3211            LargeStringArray::from_iter_values(vec!["barquet"]),
3212            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3213        );
3214
3215        ensure_compatible_write(
3216            LargeStringArray::from_iter_values(vec!["parquet"]),
3217            StringArray::from_iter_values(vec!["barquet"]),
3218            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3219        );
3220
3221        ensure_compatible_write(
3222            StringArray::from_iter_values(vec!["parquet"]),
3223            StringViewArray::from_iter_values(vec!["barquet"]),
3224            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3225        );
3226
3227        ensure_compatible_write(
3228            StringViewArray::from_iter_values(vec!["parquet"]),
3229            StringArray::from_iter_values(vec!["barquet"]),
3230            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3231        );
3232
3233        ensure_compatible_write(
3234            LargeStringArray::from_iter_values(vec!["parquet"]),
3235            StringViewArray::from_iter_values(vec!["barquet"]),
3236            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3237        );
3238
3239        ensure_compatible_write(
3240            StringViewArray::from_iter_values(vec!["parquet"]),
3241            LargeStringArray::from_iter_values(vec!["barquet"]),
3242            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3243        );
3244
3245        // check compatibility for binary types
3246
3247        ensure_compatible_write(
3248            BinaryArray::from_iter_values(vec![b"parquet"]),
3249            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3250            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3251        );
3252
3253        ensure_compatible_write(
3254            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3255            BinaryArray::from_iter_values(vec![b"barquet"]),
3256            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3257        );
3258
3259        ensure_compatible_write(
3260            BinaryArray::from_iter_values(vec![b"parquet"]),
3261            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3262            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3263        );
3264
3265        ensure_compatible_write(
3266            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3267            BinaryArray::from_iter_values(vec![b"barquet"]),
3268            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3269        );
3270
3271        ensure_compatible_write(
3272            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3273            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3274            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3275        );
3276
3277        ensure_compatible_write(
3278            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3279            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3280            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3281        );
3282    }
3283
3284    #[test]
3285    fn arrow_writer_primitive_dictionary() {
3286        // define schema
3287        #[allow(deprecated)]
3288        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3289            "dictionary",
3290            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3291            true,
3292            42,
3293            true,
3294        )]));
3295
3296        // create some data
3297        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3298        builder.append(12345678).unwrap();
3299        builder.append_null();
3300        builder.append(22345678).unwrap();
3301        builder.append(12345678).unwrap();
3302        let d = builder.finish();
3303
3304        one_column_roundtrip_with_schema(Arc::new(d), schema);
3305    }
3306
3307    #[test]
3308    fn arrow_writer_decimal32_dictionary() {
3309        let integers = vec![12345, 56789, 34567];
3310
3311        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3312
3313        let values = Decimal32Array::from(integers.clone())
3314            .with_precision_and_scale(5, 2)
3315            .unwrap();
3316
3317        let array = DictionaryArray::new(keys, Arc::new(values));
3318        one_column_roundtrip(Arc::new(array.clone()), true);
3319
3320        let values = Decimal32Array::from(integers)
3321            .with_precision_and_scale(9, 2)
3322            .unwrap();
3323
3324        let array = array.with_values(Arc::new(values));
3325        one_column_roundtrip(Arc::new(array), true);
3326    }
3327
3328    #[test]
3329    fn arrow_writer_decimal64_dictionary() {
3330        let integers = vec![12345, 56789, 34567];
3331
3332        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3333
3334        let values = Decimal64Array::from(integers.clone())
3335            .with_precision_and_scale(5, 2)
3336            .unwrap();
3337
3338        let array = DictionaryArray::new(keys, Arc::new(values));
3339        one_column_roundtrip(Arc::new(array.clone()), true);
3340
3341        let values = Decimal64Array::from(integers)
3342            .with_precision_and_scale(12, 2)
3343            .unwrap();
3344
3345        let array = array.with_values(Arc::new(values));
3346        one_column_roundtrip(Arc::new(array), true);
3347    }
3348
3349    #[test]
3350    fn arrow_writer_decimal128_dictionary() {
3351        let integers = vec![12345, 56789, 34567];
3352
3353        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3354
3355        let values = Decimal128Array::from(integers.clone())
3356            .with_precision_and_scale(5, 2)
3357            .unwrap();
3358
3359        let array = DictionaryArray::new(keys, Arc::new(values));
3360        one_column_roundtrip(Arc::new(array.clone()), true);
3361
3362        let values = Decimal128Array::from(integers)
3363            .with_precision_and_scale(12, 2)
3364            .unwrap();
3365
3366        let array = array.with_values(Arc::new(values));
3367        one_column_roundtrip(Arc::new(array), true);
3368    }
3369
3370    #[test]
3371    fn arrow_writer_decimal256_dictionary() {
3372        let integers = vec![
3373            i256::from_i128(12345),
3374            i256::from_i128(56789),
3375            i256::from_i128(34567),
3376        ];
3377
3378        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3379
3380        let values = Decimal256Array::from(integers.clone())
3381            .with_precision_and_scale(5, 2)
3382            .unwrap();
3383
3384        let array = DictionaryArray::new(keys, Arc::new(values));
3385        one_column_roundtrip(Arc::new(array.clone()), true);
3386
3387        let values = Decimal256Array::from(integers)
3388            .with_precision_and_scale(12, 2)
3389            .unwrap();
3390
3391        let array = array.with_values(Arc::new(values));
3392        one_column_roundtrip(Arc::new(array), true);
3393    }
3394
3395    #[test]
3396    fn arrow_writer_string_dictionary_unsigned_index() {
3397        // define schema
3398        #[allow(deprecated)]
3399        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3400            "dictionary",
3401            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3402            true,
3403            42,
3404            true,
3405        )]));
3406
3407        // create some data
3408        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3409            .iter()
3410            .copied()
3411            .collect();
3412
3413        one_column_roundtrip_with_schema(Arc::new(d), schema);
3414    }
3415
3416    #[test]
3417    fn u32_min_max() {
3418        // check values roundtrip through parquet
3419        let src = [
3420            u32::MIN,
3421            u32::MIN + 1,
3422            (i32::MAX as u32) - 1,
3423            i32::MAX as u32,
3424            (i32::MAX as u32) + 1,
3425            u32::MAX - 1,
3426            u32::MAX,
3427        ];
3428        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3429        let files = one_column_roundtrip(values, false);
3430
3431        for file in files {
3432            // check statistics are valid
3433            let reader = SerializedFileReader::new(file).unwrap();
3434            let metadata = reader.metadata();
3435
3436            let mut row_offset = 0;
3437            for row_group in metadata.row_groups() {
3438                assert_eq!(row_group.num_columns(), 1);
3439                let column = row_group.column(0);
3440
3441                let num_values = column.num_values() as usize;
3442                let src_slice = &src[row_offset..row_offset + num_values];
3443                row_offset += column.num_values() as usize;
3444
3445                let stats = column.statistics().unwrap();
3446                if let Statistics::Int32(stats) = stats {
3447                    assert_eq!(
3448                        *stats.min_opt().unwrap() as u32,
3449                        *src_slice.iter().min().unwrap()
3450                    );
3451                    assert_eq!(
3452                        *stats.max_opt().unwrap() as u32,
3453                        *src_slice.iter().max().unwrap()
3454                    );
3455                } else {
3456                    panic!("Statistics::Int32 missing")
3457                }
3458            }
3459        }
3460    }
3461
3462    #[test]
3463    fn u64_min_max() {
3464        // check values roundtrip through parquet
3465        let src = [
3466            u64::MIN,
3467            u64::MIN + 1,
3468            (i64::MAX as u64) - 1,
3469            i64::MAX as u64,
3470            (i64::MAX as u64) + 1,
3471            u64::MAX - 1,
3472            u64::MAX,
3473        ];
3474        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3475        let files = one_column_roundtrip(values, false);
3476
3477        for file in files {
3478            // check statistics are valid
3479            let reader = SerializedFileReader::new(file).unwrap();
3480            let metadata = reader.metadata();
3481
3482            let mut row_offset = 0;
3483            for row_group in metadata.row_groups() {
3484                assert_eq!(row_group.num_columns(), 1);
3485                let column = row_group.column(0);
3486
3487                let num_values = column.num_values() as usize;
3488                let src_slice = &src[row_offset..row_offset + num_values];
3489                row_offset += column.num_values() as usize;
3490
3491                let stats = column.statistics().unwrap();
3492                if let Statistics::Int64(stats) = stats {
3493                    assert_eq!(
3494                        *stats.min_opt().unwrap() as u64,
3495                        *src_slice.iter().min().unwrap()
3496                    );
3497                    assert_eq!(
3498                        *stats.max_opt().unwrap() as u64,
3499                        *src_slice.iter().max().unwrap()
3500                    );
3501                } else {
3502                    panic!("Statistics::Int64 missing")
3503                }
3504            }
3505        }
3506    }
3507
3508    #[test]
3509    fn statistics_null_counts_only_nulls() {
3510        // check that null-count statistics for "only NULL"-columns are correct
3511        let values = Arc::new(UInt64Array::from(vec![None, None]));
3512        let files = one_column_roundtrip(values, true);
3513
3514        for file in files {
3515            // check statistics are valid
3516            let reader = SerializedFileReader::new(file).unwrap();
3517            let metadata = reader.metadata();
3518            assert_eq!(metadata.num_row_groups(), 1);
3519            let row_group = metadata.row_group(0);
3520            assert_eq!(row_group.num_columns(), 1);
3521            let column = row_group.column(0);
3522            let stats = column.statistics().unwrap();
3523            assert_eq!(stats.null_count_opt(), Some(2));
3524        }
3525    }
3526
3527    #[test]
3528    fn test_list_of_struct_roundtrip() {
3529        // define schema
3530        let int_field = Field::new("a", DataType::Int32, true);
3531        let int_field2 = Field::new("b", DataType::Int32, true);
3532
3533        let int_builder = Int32Builder::with_capacity(10);
3534        let int_builder2 = Int32Builder::with_capacity(10);
3535
3536        let struct_builder = StructBuilder::new(
3537            vec![int_field, int_field2],
3538            vec![Box::new(int_builder), Box::new(int_builder2)],
3539        );
3540        let mut list_builder = ListBuilder::new(struct_builder);
3541
3542        // Construct the following array
3543        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3544
3545        // [{a: 1, b: 2}]
3546        let values = list_builder.values();
3547        values
3548            .field_builder::<Int32Builder>(0)
3549            .unwrap()
3550            .append_value(1);
3551        values
3552            .field_builder::<Int32Builder>(1)
3553            .unwrap()
3554            .append_value(2);
3555        values.append(true);
3556        list_builder.append(true);
3557
3558        // []
3559        list_builder.append(true);
3560
3561        // null
3562        list_builder.append(false);
3563
3564        // [null, null]
3565        let values = list_builder.values();
3566        values
3567            .field_builder::<Int32Builder>(0)
3568            .unwrap()
3569            .append_null();
3570        values
3571            .field_builder::<Int32Builder>(1)
3572            .unwrap()
3573            .append_null();
3574        values.append(false);
3575        values
3576            .field_builder::<Int32Builder>(0)
3577            .unwrap()
3578            .append_null();
3579        values
3580            .field_builder::<Int32Builder>(1)
3581            .unwrap()
3582            .append_null();
3583        values.append(false);
3584        list_builder.append(true);
3585
3586        // [{a: null, b: 3}]
3587        let values = list_builder.values();
3588        values
3589            .field_builder::<Int32Builder>(0)
3590            .unwrap()
3591            .append_null();
3592        values
3593            .field_builder::<Int32Builder>(1)
3594            .unwrap()
3595            .append_value(3);
3596        values.append(true);
3597        list_builder.append(true);
3598
3599        // [{a: 2, b: null}]
3600        let values = list_builder.values();
3601        values
3602            .field_builder::<Int32Builder>(0)
3603            .unwrap()
3604            .append_value(2);
3605        values
3606            .field_builder::<Int32Builder>(1)
3607            .unwrap()
3608            .append_null();
3609        values.append(true);
3610        list_builder.append(true);
3611
3612        let array = Arc::new(list_builder.finish());
3613
3614        one_column_roundtrip(array, true);
3615    }
3616
3617    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3618        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3619    }
3620
3621    #[test]
3622    fn test_aggregates_records() {
3623        let arrays = [
3624            Int32Array::from((0..100).collect::<Vec<_>>()),
3625            Int32Array::from((0..50).collect::<Vec<_>>()),
3626            Int32Array::from((200..500).collect::<Vec<_>>()),
3627        ];
3628
3629        let schema = Arc::new(Schema::new(vec![Field::new(
3630            "int",
3631            ArrowDataType::Int32,
3632            false,
3633        )]));
3634
3635        let file = tempfile::tempfile().unwrap();
3636
3637        let props = WriterProperties::builder()
3638            .set_max_row_group_size(200)
3639            .build();
3640
3641        let mut writer =
3642            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3643
3644        for array in arrays {
3645            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3646            writer.write(&batch).unwrap();
3647        }
3648
3649        writer.close().unwrap();
3650
3651        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3652        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3653
3654        let batches = builder
3655            .with_batch_size(100)
3656            .build()
3657            .unwrap()
3658            .collect::<ArrowResult<Vec<_>>>()
3659            .unwrap();
3660
3661        assert_eq!(batches.len(), 5);
3662        assert!(batches.iter().all(|x| x.num_columns() == 1));
3663
3664        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3665
3666        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3667
3668        let values: Vec<_> = batches
3669            .iter()
3670            .flat_map(|x| {
3671                x.column(0)
3672                    .as_any()
3673                    .downcast_ref::<Int32Array>()
3674                    .unwrap()
3675                    .values()
3676                    .iter()
3677                    .cloned()
3678            })
3679            .collect();
3680
3681        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3682        assert_eq!(&values, &expected_values)
3683    }
3684
3685    #[test]
3686    fn complex_aggregate() {
3687        // Tests aggregating nested data
3688        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3689        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3690        let struct_a = Arc::new(Field::new(
3691            "struct_a",
3692            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3693            true,
3694        ));
3695
3696        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3697        let struct_b = Arc::new(Field::new(
3698            "struct_b",
3699            DataType::Struct(vec![list_a.clone()].into()),
3700            false,
3701        ));
3702
3703        let schema = Arc::new(Schema::new(vec![struct_b]));
3704
3705        // create nested data
3706        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3707        let field_b_array =
3708            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3709
3710        let struct_a_array = StructArray::from(vec![
3711            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3712            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3713        ]);
3714
3715        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3716            .len(5)
3717            .add_buffer(Buffer::from_iter(vec![
3718                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3719            ]))
3720            .null_bit_buffer(Some(Buffer::from_iter(vec![
3721                true, false, true, false, true,
3722            ])))
3723            .child_data(vec![struct_a_array.into_data()])
3724            .build()
3725            .unwrap();
3726
3727        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3728        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3729
3730        let batch1 =
3731            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3732                .unwrap();
3733
3734        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3735        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3736
3737        let struct_a_array = StructArray::from(vec![
3738            (field_a, Arc::new(field_a_array) as ArrayRef),
3739            (field_b, Arc::new(field_b_array) as ArrayRef),
3740        ]);
3741
3742        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3743            .len(2)
3744            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3745            .child_data(vec![struct_a_array.into_data()])
3746            .build()
3747            .unwrap();
3748
3749        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3750        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3751
3752        let batch2 =
3753            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3754                .unwrap();
3755
3756        let batches = &[batch1, batch2];
3757
3758        // Verify data is as expected
3759
3760        let expected = r#"
3761            +-------------------------------------------------------------------------------------------------------+
3762            | struct_b                                                                                              |
3763            +-------------------------------------------------------------------------------------------------------+
3764            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3765            | {list: }                                                                                              |
3766            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3767            | {list: }                                                                                              |
3768            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3769            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3770            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3771            +-------------------------------------------------------------------------------------------------------+
3772        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3773
3774        let actual = pretty_format_batches(batches).unwrap().to_string();
3775        assert_eq!(actual, expected);
3776
3777        // Write data
3778        let file = tempfile::tempfile().unwrap();
3779        let props = WriterProperties::builder()
3780            .set_max_row_group_size(6)
3781            .build();
3782
3783        let mut writer =
3784            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3785
3786        for batch in batches {
3787            writer.write(batch).unwrap();
3788        }
3789        writer.close().unwrap();
3790
3791        // Read Data
3792        // Should have written entire first batch and first row of second to the first row group
3793        // leaving a single row in the second row group
3794
3795        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3796        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3797
3798        let batches = builder
3799            .with_batch_size(2)
3800            .build()
3801            .unwrap()
3802            .collect::<ArrowResult<Vec<_>>>()
3803            .unwrap();
3804
3805        assert_eq!(batches.len(), 4);
3806        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3807        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3808
3809        let actual = pretty_format_batches(&batches).unwrap().to_string();
3810        assert_eq!(actual, expected);
3811    }
3812
3813    #[test]
3814    fn test_arrow_writer_metadata() {
3815        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3816        let file_schema = batch_schema.clone().with_metadata(
3817            vec![("foo".to_string(), "bar".to_string())]
3818                .into_iter()
3819                .collect(),
3820        );
3821
3822        let batch = RecordBatch::try_new(
3823            Arc::new(batch_schema),
3824            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3825        )
3826        .unwrap();
3827
3828        let mut buf = Vec::with_capacity(1024);
3829        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3830        writer.write(&batch).unwrap();
3831        writer.close().unwrap();
3832    }
3833
3834    #[test]
3835    fn test_arrow_writer_nullable() {
3836        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3837        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3838        let file_schema = Arc::new(file_schema);
3839
3840        let batch = RecordBatch::try_new(
3841            Arc::new(batch_schema),
3842            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3843        )
3844        .unwrap();
3845
3846        let mut buf = Vec::with_capacity(1024);
3847        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3848        writer.write(&batch).unwrap();
3849        writer.close().unwrap();
3850
3851        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3852        let back = read.next().unwrap().unwrap();
3853        assert_eq!(back.schema(), file_schema);
3854        assert_ne!(back.schema(), batch.schema());
3855        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3856    }
3857
3858    #[test]
3859    fn in_progress_accounting() {
3860        // define schema
3861        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3862
3863        // create some data
3864        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3865
3866        // build a record batch
3867        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3868
3869        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3870
3871        // starts empty
3872        assert_eq!(writer.in_progress_size(), 0);
3873        assert_eq!(writer.in_progress_rows(), 0);
3874        assert_eq!(writer.memory_size(), 0);
3875        assert_eq!(writer.bytes_written(), 4); // Initial header
3876        writer.write(&batch).unwrap();
3877
3878        // updated on write
3879        let initial_size = writer.in_progress_size();
3880        assert!(initial_size > 0);
3881        assert_eq!(writer.in_progress_rows(), 5);
3882        let initial_memory = writer.memory_size();
3883        assert!(initial_memory > 0);
3884        // memory estimate is larger than estimated encoded size
3885        assert!(
3886            initial_size <= initial_memory,
3887            "{initial_size} <= {initial_memory}"
3888        );
3889
3890        // updated on second write
3891        writer.write(&batch).unwrap();
3892        assert!(writer.in_progress_size() > initial_size);
3893        assert_eq!(writer.in_progress_rows(), 10);
3894        assert!(writer.memory_size() > initial_memory);
3895        assert!(
3896            writer.in_progress_size() <= writer.memory_size(),
3897            "in_progress_size {} <= memory_size {}",
3898            writer.in_progress_size(),
3899            writer.memory_size()
3900        );
3901
3902        // in progress tracking is cleared, but the overall data written is updated
3903        let pre_flush_bytes_written = writer.bytes_written();
3904        writer.flush().unwrap();
3905        assert_eq!(writer.in_progress_size(), 0);
3906        assert_eq!(writer.memory_size(), 0);
3907        assert!(writer.bytes_written() > pre_flush_bytes_written);
3908
3909        writer.close().unwrap();
3910    }
3911
3912    #[test]
3913    fn test_writer_all_null() {
3914        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3915        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3916        let batch = RecordBatch::try_from_iter(vec![
3917            ("a", Arc::new(a) as ArrayRef),
3918            ("b", Arc::new(b) as ArrayRef),
3919        ])
3920        .unwrap();
3921
3922        let mut buf = Vec::with_capacity(1024);
3923        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3924        writer.write(&batch).unwrap();
3925        writer.close().unwrap();
3926
3927        let bytes = Bytes::from(buf);
3928        let options = ReadOptionsBuilder::new().with_page_index().build();
3929        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3930        let index = reader.metadata().offset_index().unwrap();
3931
3932        assert_eq!(index.len(), 1);
3933        assert_eq!(index[0].len(), 2); // 2 columns
3934        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3935        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3936    }
3937
3938    #[test]
3939    fn test_disabled_statistics_with_page() {
3940        let file_schema = Schema::new(vec![
3941            Field::new("a", DataType::Utf8, true),
3942            Field::new("b", DataType::Utf8, true),
3943        ]);
3944        let file_schema = Arc::new(file_schema);
3945
3946        let batch = RecordBatch::try_new(
3947            file_schema.clone(),
3948            vec![
3949                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3950                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3951            ],
3952        )
3953        .unwrap();
3954
3955        let props = WriterProperties::builder()
3956            .set_statistics_enabled(EnabledStatistics::None)
3957            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3958            .build();
3959
3960        let mut buf = Vec::with_capacity(1024);
3961        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3962        writer.write(&batch).unwrap();
3963
3964        let metadata = writer.close().unwrap();
3965        assert_eq!(metadata.row_groups.len(), 1);
3966        let row_group = &metadata.row_groups[0];
3967        assert_eq!(row_group.columns.len(), 2);
3968        // Column "a" has both offset and column index, as requested
3969        assert!(row_group.columns[0].offset_index_offset.is_some());
3970        assert!(row_group.columns[0].column_index_offset.is_some());
3971        // Column "b" should only have offset index
3972        assert!(row_group.columns[1].offset_index_offset.is_some());
3973        assert!(row_group.columns[1].column_index_offset.is_none());
3974
3975        let options = ReadOptionsBuilder::new().with_page_index().build();
3976        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3977
3978        let row_group = reader.get_row_group(0).unwrap();
3979        let a_col = row_group.metadata().column(0);
3980        let b_col = row_group.metadata().column(1);
3981
3982        // Column chunk of column "a" should have chunk level statistics
3983        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3984            let min = byte_array_stats.min_opt().unwrap();
3985            let max = byte_array_stats.max_opt().unwrap();
3986
3987            assert_eq!(min.as_bytes(), b"a");
3988            assert_eq!(max.as_bytes(), b"d");
3989        } else {
3990            panic!("expecting Statistics::ByteArray");
3991        }
3992
3993        // The column chunk for column "b" shouldn't have statistics
3994        assert!(b_col.statistics().is_none());
3995
3996        let offset_index = reader.metadata().offset_index().unwrap();
3997        assert_eq!(offset_index.len(), 1); // 1 row group
3998        assert_eq!(offset_index[0].len(), 2); // 2 columns
3999
4000        let column_index = reader.metadata().column_index().unwrap();
4001        assert_eq!(column_index.len(), 1); // 1 row group
4002        assert_eq!(column_index[0].len(), 2); // 2 columns
4003
4004        let a_idx = &column_index[0][0];
4005        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
4006        let b_idx = &column_index[0][1];
4007        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
4008    }
4009
4010    #[test]
4011    fn test_disabled_statistics_with_chunk() {
4012        let file_schema = Schema::new(vec![
4013            Field::new("a", DataType::Utf8, true),
4014            Field::new("b", DataType::Utf8, true),
4015        ]);
4016        let file_schema = Arc::new(file_schema);
4017
4018        let batch = RecordBatch::try_new(
4019            file_schema.clone(),
4020            vec![
4021                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4022                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4023            ],
4024        )
4025        .unwrap();
4026
4027        let props = WriterProperties::builder()
4028            .set_statistics_enabled(EnabledStatistics::None)
4029            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4030            .build();
4031
4032        let mut buf = Vec::with_capacity(1024);
4033        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4034        writer.write(&batch).unwrap();
4035
4036        let metadata = writer.close().unwrap();
4037        assert_eq!(metadata.row_groups.len(), 1);
4038        let row_group = &metadata.row_groups[0];
4039        assert_eq!(row_group.columns.len(), 2);
4040        // Column "a" should only have offset index
4041        assert!(row_group.columns[0].offset_index_offset.is_some());
4042        assert!(row_group.columns[0].column_index_offset.is_none());
4043        // Column "b" should only have offset index
4044        assert!(row_group.columns[1].offset_index_offset.is_some());
4045        assert!(row_group.columns[1].column_index_offset.is_none());
4046
4047        let options = ReadOptionsBuilder::new().with_page_index().build();
4048        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4049
4050        let row_group = reader.get_row_group(0).unwrap();
4051        let a_col = row_group.metadata().column(0);
4052        let b_col = row_group.metadata().column(1);
4053
4054        // Column chunk of column "a" should have chunk level statistics
4055        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4056            let min = byte_array_stats.min_opt().unwrap();
4057            let max = byte_array_stats.max_opt().unwrap();
4058
4059            assert_eq!(min.as_bytes(), b"a");
4060            assert_eq!(max.as_bytes(), b"d");
4061        } else {
4062            panic!("expecting Statistics::ByteArray");
4063        }
4064
4065        // The column chunk for column "b"  shouldn't have statistics
4066        assert!(b_col.statistics().is_none());
4067
4068        let column_index = reader.metadata().column_index().unwrap();
4069        assert_eq!(column_index.len(), 1); // 1 row group
4070        assert_eq!(column_index[0].len(), 2); // 2 columns
4071
4072        let a_idx = &column_index[0][0];
4073        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
4074        let b_idx = &column_index[0][1];
4075        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
4076    }
4077
4078    #[test]
4079    fn test_arrow_writer_skip_metadata() {
4080        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4081        let file_schema = Arc::new(batch_schema.clone());
4082
4083        let batch = RecordBatch::try_new(
4084            Arc::new(batch_schema),
4085            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4086        )
4087        .unwrap();
4088        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4089
4090        let mut buf = Vec::with_capacity(1024);
4091        let mut writer =
4092            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4093        writer.write(&batch).unwrap();
4094        writer.close().unwrap();
4095
4096        let bytes = Bytes::from(buf);
4097        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4098        assert_eq!(file_schema, *reader_builder.schema());
4099        if let Some(key_value_metadata) = reader_builder
4100            .metadata()
4101            .file_metadata()
4102            .key_value_metadata()
4103        {
4104            assert!(!key_value_metadata
4105                .iter()
4106                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
4107        }
4108    }
4109
4110    #[test]
4111    fn mismatched_schemas() {
4112        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4113        let file_schema = Arc::new(Schema::new(vec![Field::new(
4114            "temperature",
4115            DataType::Float64,
4116            false,
4117        )]));
4118
4119        let batch = RecordBatch::try_new(
4120            Arc::new(batch_schema),
4121            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4122        )
4123        .unwrap();
4124
4125        let mut buf = Vec::with_capacity(1024);
4126        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4127
4128        let err = writer.write(&batch).unwrap_err().to_string();
4129        assert_eq!(
4130            err,
4131            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4132        );
4133    }
4134
4135    #[test]
4136    // https://github.com/apache/arrow-rs/issues/6988
4137    fn test_roundtrip_empty_schema() {
4138        // create empty record batch with empty schema
4139        let empty_batch = RecordBatch::try_new_with_options(
4140            Arc::new(Schema::empty()),
4141            vec![],
4142            &RecordBatchOptions::default().with_row_count(Some(0)),
4143        )
4144        .unwrap();
4145
4146        // write to parquet
4147        let mut parquet_bytes: Vec<u8> = Vec::new();
4148        let mut writer =
4149            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4150        writer.write(&empty_batch).unwrap();
4151        writer.close().unwrap();
4152
4153        // read from parquet
4154        let bytes = Bytes::from(parquet_bytes);
4155        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4156        assert_eq!(reader.schema(), &empty_batch.schema());
4157        let batches: Vec<_> = reader
4158            .build()
4159            .unwrap()
4160            .collect::<ArrowResult<Vec<_>>>()
4161            .unwrap();
4162        assert_eq!(batches.len(), 0);
4163    }
4164
4165    #[test]
4166    fn test_page_stats_not_written_by_default() {
4167        let string_field = Field::new("a", DataType::Utf8, false);
4168        let schema = Schema::new(vec![string_field]);
4169        let raw_string_values = vec!["Blart Versenwald III"];
4170        let string_values = StringArray::from(raw_string_values.clone());
4171        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4172
4173        let props = WriterProperties::builder()
4174            .set_statistics_enabled(EnabledStatistics::Page)
4175            .set_dictionary_enabled(false)
4176            .set_encoding(Encoding::PLAIN)
4177            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4178            .build();
4179
4180        let mut file = roundtrip_opts(&batch, props);
4181
4182        // read file and decode page headers
4183        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4184        let mut buf = vec![];
4185        file.seek(std::io::SeekFrom::Start(0)).unwrap();
4186        let read = file.read_to_end(&mut buf).unwrap();
4187        assert!(read > 0);
4188
4189        // decode first page header
4190        let first_page = &buf[4..];
4191        let mut prot = TCompactSliceInputProtocol::new(first_page);
4192        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4193        let stats = hdr.data_page_header.unwrap().statistics;
4194
4195        assert!(stats.is_none());
4196    }
4197
4198    #[test]
4199    fn test_page_stats_when_enabled() {
4200        let string_field = Field::new("a", DataType::Utf8, false);
4201        let schema = Schema::new(vec![string_field]);
4202        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4203        let string_values = StringArray::from(raw_string_values.clone());
4204        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4205
4206        let props = WriterProperties::builder()
4207            .set_statistics_enabled(EnabledStatistics::Page)
4208            .set_dictionary_enabled(false)
4209            .set_encoding(Encoding::PLAIN)
4210            .set_write_page_header_statistics(true)
4211            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4212            .build();
4213
4214        let mut file = roundtrip_opts(&batch, props);
4215
4216        // read file and decode page headers
4217        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4218        let mut buf = vec![];
4219        file.seek(std::io::SeekFrom::Start(0)).unwrap();
4220        let read = file.read_to_end(&mut buf).unwrap();
4221        assert!(read > 0);
4222
4223        // decode first page header
4224        let first_page = &buf[4..];
4225        let mut prot = TCompactSliceInputProtocol::new(first_page);
4226        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4227        let stats = hdr.data_page_header.unwrap().statistics;
4228
4229        let stats = stats.unwrap();
4230        // check that min/max were actually written to the page
4231        assert!(stats.is_max_value_exact.unwrap());
4232        assert!(stats.is_min_value_exact.unwrap());
4233        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4234        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4235    }
4236
4237    #[test]
4238    fn test_page_stats_truncation() {
4239        let string_field = Field::new("a", DataType::Utf8, false);
4240        let binary_field = Field::new("b", DataType::Binary, false);
4241        let schema = Schema::new(vec![string_field, binary_field]);
4242
4243        let raw_string_values = vec!["Blart Versenwald III"];
4244        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4245        let raw_binary_value_refs = raw_binary_values
4246            .iter()
4247            .map(|x| x.as_slice())
4248            .collect::<Vec<_>>();
4249
4250        let string_values = StringArray::from(raw_string_values.clone());
4251        let binary_values = BinaryArray::from(raw_binary_value_refs);
4252        let batch = RecordBatch::try_new(
4253            Arc::new(schema),
4254            vec![Arc::new(string_values), Arc::new(binary_values)],
4255        )
4256        .unwrap();
4257
4258        let props = WriterProperties::builder()
4259            .set_statistics_truncate_length(Some(2))
4260            .set_dictionary_enabled(false)
4261            .set_encoding(Encoding::PLAIN)
4262            .set_write_page_header_statistics(true)
4263            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4264            .build();
4265
4266        let mut file = roundtrip_opts(&batch, props);
4267
4268        // read file and decode page headers
4269        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4270        let mut buf = vec![];
4271        file.seek(std::io::SeekFrom::Start(0)).unwrap();
4272        let read = file.read_to_end(&mut buf).unwrap();
4273        assert!(read > 0);
4274
4275        // decode first page header
4276        let first_page = &buf[4..];
4277        let mut prot = TCompactSliceInputProtocol::new(first_page);
4278        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4279        let stats = hdr.data_page_header.unwrap().statistics;
4280        assert!(stats.is_some());
4281        let stats = stats.unwrap();
4282        // check that min/max were properly truncated
4283        assert!(!stats.is_max_value_exact.unwrap());
4284        assert!(!stats.is_min_value_exact.unwrap());
4285        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4286        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4287
4288        // check second page now
4289        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4290        let mut prot = TCompactSliceInputProtocol::new(second_page);
4291        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4292        let stats = hdr.data_page_header.unwrap().statistics;
4293        assert!(stats.is_some());
4294        let stats = stats.unwrap();
4295        // check that min/max were properly truncated
4296        assert!(!stats.is_max_value_exact.unwrap());
4297        assert!(!stats.is_min_value_exact.unwrap());
4298        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4299        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4300    }
4301
4302    #[test]
4303    fn test_page_encoding_statistics_roundtrip() {
4304        let batch_schema = Schema::new(vec![Field::new(
4305            "int32",
4306            arrow_schema::DataType::Int32,
4307            false,
4308        )]);
4309
4310        let batch = RecordBatch::try_new(
4311            Arc::new(batch_schema.clone()),
4312            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4313        )
4314        .unwrap();
4315
4316        let mut file: File = tempfile::tempfile().unwrap();
4317        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4318        writer.write(&batch).unwrap();
4319        let file_metadata = writer.close().unwrap();
4320
4321        assert_eq!(file_metadata.row_groups.len(), 1);
4322        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
4323        let chunk_meta = file_metadata.row_groups[0].columns[0]
4324            .meta_data
4325            .as_ref()
4326            .expect("column metadata missing");
4327        assert!(chunk_meta.encoding_stats.is_some());
4328        let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
4329
4330        // check that the read metadata is also correct
4331        let options = ReadOptionsBuilder::new().with_page_index().build();
4332        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4333
4334        let rowgroup = reader.get_row_group(0).expect("row group missing");
4335        assert_eq!(rowgroup.num_columns(), 1);
4336        let column = rowgroup.metadata().column(0);
4337        assert!(column.page_encoding_stats().is_some());
4338        let file_page_stats = column.page_encoding_stats().unwrap();
4339        let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
4340            .iter()
4341            .map(|x| crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
4342            .collect();
4343        assert_eq!(&chunk_stats, file_page_stats);
4344    }
4345
4346    #[test]
4347    fn test_different_dict_page_size_limit() {
4348        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4349        let schema = Arc::new(Schema::new(vec![
4350            Field::new("col0", arrow_schema::DataType::Int64, false),
4351            Field::new("col1", arrow_schema::DataType::Int64, false),
4352        ]));
4353        let batch =
4354            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4355
4356        let props = WriterProperties::builder()
4357            .set_dictionary_page_size_limit(1024 * 1024)
4358            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4359            .build();
4360        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4361        writer.write(&batch).unwrap();
4362        let data = Bytes::from(writer.into_inner().unwrap());
4363
4364        let mut metadata = ParquetMetaDataReader::new();
4365        metadata.try_parse(&data).unwrap();
4366        let metadata = metadata.finish().unwrap();
4367        let col0_meta = metadata.row_group(0).column(0);
4368        let col1_meta = metadata.row_group(0).column(1);
4369
4370        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4371            let mut reader =
4372                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4373            let page = reader.get_next_page().unwrap().unwrap();
4374            match page {
4375                Page::DictionaryPage { buf, .. } => buf.len(),
4376                _ => panic!("expected DictionaryPage"),
4377            }
4378        };
4379
4380        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4381        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4382    }
4383}