parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
30use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
31
32use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
33
34use crate::arrow::ArrowSchemaConverter;
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
37use crate::column::page_encryption::PageEncryptor;
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43#[cfg(feature = "encryption")]
44use crate::encryption::encrypt::FileEncryptor;
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
51use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
52use levels::{ArrayLevels, calculate_array_levels};
53
54mod byte_array;
55mod levels;
56
57/// Encodes [`RecordBatch`] to parquet
58///
59/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
60/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
61/// flushed on close, leading the final row group in the output file to potentially
62/// contain fewer than `max_row_group_size` rows
63///
64/// # Example: Writing `RecordBatch`es
65/// ```
66/// # use std::sync::Arc;
67/// # use bytes::Bytes;
68/// # use arrow_array::{ArrayRef, Int64Array};
69/// # use arrow_array::RecordBatch;
70/// # use parquet::arrow::arrow_writer::ArrowWriter;
71/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
72/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
73/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
74///
75/// let mut buffer = Vec::new();
76/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
77/// writer.write(&to_write).unwrap();
78/// writer.close().unwrap();
79///
80/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
81/// let read = reader.next().unwrap().unwrap();
82///
83/// assert_eq!(to_write, read);
84/// ```
85///
86/// # Memory Usage and Limiting
87///
88/// The nature of Parquet requires buffering of an entire row group before it can
89/// be flushed to the underlying writer. Data is mostly buffered in its encoded
90/// form, reducing memory usage. However, some data such as dictionary keys,
91/// large strings or very nested data may still result in non-trivial memory
92/// usage.
93///
94/// See Also:
95/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
96/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
97///
98/// Call [`Self::flush`] to trigger an early flush of a row group based on a
99/// memory threshold and/or global memory pressure. However,  smaller row groups
100/// result in higher metadata overheads, and thus may worsen compression ratios
101/// and query performance.
102///
103/// ```no_run
104/// # use std::io::Write;
105/// # use arrow_array::RecordBatch;
106/// # use parquet::arrow::ArrowWriter;
107/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
108/// # let batch: RecordBatch = todo!();
109/// writer.write(&batch).unwrap();
110/// // Trigger an early flush if anticipated size exceeds 1_000_000
111/// if writer.in_progress_size() > 1_000_000 {
112///     writer.flush().unwrap();
113/// }
114/// ```
115///
116/// ## Type Support
117///
118/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
119/// Parquet types including  [`StructArray`] and [`ListArray`].
120///
121/// The following are not supported:
122///
123/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
124///
125/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
126/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
127/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
128/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
129/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
130///
131/// ## Type Compatibility
132/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
133/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
134/// value type.
135///
136/// For example, the following [`DataType`]s are all logically equivalent and can be written
137/// to the same column:
138/// * String, LargeString, StringView
139/// * Binary, LargeBinary, BinaryView
140///
141/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
142/// contain compatible values.
143/// ```
144/// # use std::sync::Arc;
145/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
146/// # use arrow_schema::{DataType, Field, Schema};
147/// # use parquet::arrow::arrow_writer::ArrowWriter;
148/// let record_batch1 = RecordBatch::try_new(
149///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
150///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
151///  )
152/// .unwrap();
153///
154/// let mut buffer = Vec::new();
155/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
156/// writer.write(&record_batch1).unwrap();
157///
158/// let record_batch2 = RecordBatch::try_new(
159///     Arc::new(Schema::new(vec![Field::new(
160///         "col",
161///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
162///          false,
163///     )])),
164///     vec![Arc::new(DictionaryArray::new(
165///          UInt8Array::from_iter_values(vec![0, 1]),
166///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
167///      ))],
168///  )
169///  .unwrap();
170///  writer.write(&record_batch2).unwrap();
171///  writer.close();
172/// ```
173pub struct ArrowWriter<W: Write> {
174    /// Underlying Parquet writer
175    writer: SerializedFileWriter<W>,
176
177    /// The in-progress row group if any
178    in_progress: Option<ArrowRowGroupWriter>,
179
180    /// A copy of the Arrow schema.
181    ///
182    /// The schema is used to verify that each record batch written has the correct schema
183    arrow_schema: SchemaRef,
184
185    /// Creates new [`ArrowRowGroupWriter`] instances as required
186    row_group_writer_factory: ArrowRowGroupWriterFactory,
187
188    /// The length of arrays to write to each row group
189    max_row_group_size: usize,
190}
191
192impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        let buffered_memory = self.in_progress_size();
195        f.debug_struct("ArrowWriter")
196            .field("writer", &self.writer)
197            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
198            .field("in_progress_rows", &self.in_progress_rows())
199            .field("arrow_schema", &self.arrow_schema)
200            .field("max_row_group_size", &self.max_row_group_size)
201            .finish()
202    }
203}
204
205impl<W: Write + Send> ArrowWriter<W> {
206    /// Try to create a new Arrow writer
207    ///
208    /// The writer will fail if:
209    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
210    ///  * the Arrow schema contains unsupported datatypes such as Unions
211    pub fn try_new(
212        writer: W,
213        arrow_schema: SchemaRef,
214        props: Option<WriterProperties>,
215    ) -> Result<Self> {
216        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
217        Self::try_new_with_options(writer, arrow_schema, options)
218    }
219
220    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
221    ///
222    /// The writer will fail if:
223    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
224    ///  * the Arrow schema contains unsupported datatypes such as Unions
225    pub fn try_new_with_options(
226        writer: W,
227        arrow_schema: SchemaRef,
228        options: ArrowWriterOptions,
229    ) -> Result<Self> {
230        let mut props = options.properties;
231
232        let schema = if let Some(parquet_schema) = options.schema_descr {
233            parquet_schema.clone()
234        } else {
235            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
236            if let Some(schema_root) = &options.schema_root {
237                converter = converter.schema_root(schema_root);
238            }
239
240            converter.convert(&arrow_schema)?
241        };
242
243        if !options.skip_arrow_metadata {
244            // add serialized arrow schema
245            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
246        }
247
248        let max_row_group_size = props.max_row_group_size();
249
250        let props_ptr = Arc::new(props);
251        let file_writer =
252            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
253
254        let row_group_writer_factory =
255            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
256
257        Ok(Self {
258            writer: file_writer,
259            in_progress: None,
260            arrow_schema,
261            row_group_writer_factory,
262            max_row_group_size,
263        })
264    }
265
266    /// Returns metadata for any flushed row groups
267    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
268        self.writer.flushed_row_groups()
269    }
270
271    /// Estimated memory usage, in bytes, of this `ArrowWriter`
272    ///
273    /// This estimate is formed bu summing the values of
274    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
275    pub fn memory_size(&self) -> usize {
276        match &self.in_progress {
277            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
278            None => 0,
279        }
280    }
281
282    /// Anticipated encoded size of the in progress row group.
283    ///
284    /// This estimate the row group size after being completely encoded is,
285    /// formed by summing the values of
286    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
287    /// columns.
288    pub fn in_progress_size(&self) -> usize {
289        match &self.in_progress {
290            Some(in_progress) => in_progress
291                .writers
292                .iter()
293                .map(|x| x.get_estimated_total_bytes())
294                .sum(),
295            None => 0,
296        }
297    }
298
299    /// Returns the number of rows buffered in the in progress row group
300    pub fn in_progress_rows(&self) -> usize {
301        self.in_progress
302            .as_ref()
303            .map(|x| x.buffered_rows)
304            .unwrap_or_default()
305    }
306
307    /// Returns the number of bytes written by this instance
308    pub fn bytes_written(&self) -> usize {
309        self.writer.bytes_written()
310    }
311
312    /// Encodes the provided [`RecordBatch`]
313    ///
314    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
315    /// rows, the contents of `batch` will be written to one or more row groups such that all but
316    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
317    ///
318    /// This will fail if the `batch`'s schema does not match the writer's schema.
319    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
320        if batch.num_rows() == 0 {
321            return Ok(());
322        }
323
324        let in_progress = match &mut self.in_progress {
325            Some(in_progress) => in_progress,
326            x => x.insert(
327                self.row_group_writer_factory
328                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
329            ),
330        };
331
332        // If would exceed max_row_group_size, split batch
333        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
334            let to_write = self.max_row_group_size - in_progress.buffered_rows;
335            let a = batch.slice(0, to_write);
336            let b = batch.slice(to_write, batch.num_rows() - to_write);
337            self.write(&a)?;
338            return self.write(&b);
339        }
340
341        in_progress.write(batch)?;
342
343        if in_progress.buffered_rows >= self.max_row_group_size {
344            self.flush()?
345        }
346        Ok(())
347    }
348
349    /// Writes the given buf bytes to the internal buffer.
350    ///
351    /// It's safe to use this method to write data to the underlying writer,
352    /// because it will ensure that the buffering and byte‐counting layers are used.
353    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
354        self.writer.write_all(buf)
355    }
356
357    /// Flushes underlying writer
358    pub fn sync(&mut self) -> std::io::Result<()> {
359        self.writer.flush()
360    }
361
362    /// Flushes all buffered rows into a new row group
363    ///
364    /// Note the underlying writer is not flushed with this call.
365    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
366    pub fn flush(&mut self) -> Result<()> {
367        let in_progress = match self.in_progress.take() {
368            Some(in_progress) => in_progress,
369            None => return Ok(()),
370        };
371
372        let mut row_group_writer = self.writer.next_row_group()?;
373        for chunk in in_progress.close()? {
374            chunk.append_to_row_group(&mut row_group_writer)?;
375        }
376        row_group_writer.close()?;
377        Ok(())
378    }
379
380    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
381    ///
382    /// This method provide a way to append kv_metadata after write RecordBatch
383    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
384        self.writer.append_key_value_metadata(kv_metadata)
385    }
386
387    /// Returns a reference to the underlying writer.
388    pub fn inner(&self) -> &W {
389        self.writer.inner()
390    }
391
392    /// Returns a mutable reference to the underlying writer.
393    ///
394    /// **Warning**: if you write directly to this writer, you will skip
395    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
396    /// the file footer’s recorded offsets and sizes to diverge from reality,
397    /// resulting in an unreadable or corrupted Parquet file.
398    ///
399    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
400    pub fn inner_mut(&mut self) -> &mut W {
401        self.writer.inner_mut()
402    }
403
404    /// Flushes any outstanding data and returns the underlying writer.
405    pub fn into_inner(mut self) -> Result<W> {
406        self.flush()?;
407        self.writer.into_inner()
408    }
409
410    /// Close and finalize the underlying Parquet writer
411    ///
412    /// Unlike [`Self::close`] this does not consume self
413    ///
414    /// Attempting to write after calling finish will result in an error
415    pub fn finish(&mut self) -> Result<ParquetMetaData> {
416        self.flush()?;
417        self.writer.finish()
418    }
419
420    /// Close and finalize the underlying Parquet writer
421    pub fn close(mut self) -> Result<ParquetMetaData> {
422        self.finish()
423    }
424
425    /// Create a new row group writer and return its column writers.
426    #[deprecated(
427        since = "56.2.0",
428        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
429    )]
430    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
431        self.flush()?;
432        let in_progress = self
433            .row_group_writer_factory
434            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
435        Ok(in_progress.writers)
436    }
437
438    /// Append the given column chunks to the file as a new row group.
439    #[deprecated(
440        since = "56.2.0",
441        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
442    )]
443    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
444        let mut row_group_writer = self.writer.next_row_group()?;
445        for chunk in chunks {
446            chunk.append_to_row_group(&mut row_group_writer)?;
447        }
448        row_group_writer.close()?;
449        Ok(())
450    }
451
452    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
453    ///
454    /// Flushes any outstanding data before returning.
455    ///
456    /// This can be useful to provide more control over how files are written, for example
457    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
458    pub fn into_serialized_writer(
459        mut self,
460    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
461        self.flush()?;
462        Ok((self.writer, self.row_group_writer_factory))
463    }
464}
465
466impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
467    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
468        self.write(batch).map_err(|e| e.into())
469    }
470
471    fn close(self) -> std::result::Result<(), ArrowError> {
472        self.close()?;
473        Ok(())
474    }
475}
476
477/// Arrow-specific configuration settings for writing parquet files.
478///
479/// See [`ArrowWriter`] for how to configure the writer.
480#[derive(Debug, Clone, Default)]
481pub struct ArrowWriterOptions {
482    properties: WriterProperties,
483    skip_arrow_metadata: bool,
484    schema_root: Option<String>,
485    schema_descr: Option<SchemaDescriptor>,
486}
487
488impl ArrowWriterOptions {
489    /// Creates a new [`ArrowWriterOptions`] with the default settings.
490    pub fn new() -> Self {
491        Self::default()
492    }
493
494    /// Sets the [`WriterProperties`] for writing parquet files.
495    pub fn with_properties(self, properties: WriterProperties) -> Self {
496        Self { properties, ..self }
497    }
498
499    /// Skip encoding the embedded arrow metadata (defaults to `false`)
500    ///
501    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
502    /// by default.
503    ///
504    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
505    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
506        Self {
507            skip_arrow_metadata,
508            ..self
509        }
510    }
511
512    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
513    pub fn with_schema_root(self, schema_root: String) -> Self {
514        Self {
515            schema_root: Some(schema_root),
516            ..self
517        }
518    }
519
520    /// Explicitly specify the Parquet schema to be used
521    ///
522    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
523    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
524    /// already known or must be calculated using custom logic.
525    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
526        Self {
527            schema_descr: Some(schema_descr),
528            ..self
529        }
530    }
531}
532
533/// A single column chunk produced by [`ArrowColumnWriter`]
534#[derive(Default)]
535struct ArrowColumnChunkData {
536    length: usize,
537    data: Vec<Bytes>,
538}
539
540impl Length for ArrowColumnChunkData {
541    fn len(&self) -> u64 {
542        self.length as _
543    }
544}
545
546impl ChunkReader for ArrowColumnChunkData {
547    type T = ArrowColumnChunkReader;
548
549    fn get_read(&self, start: u64) -> Result<Self::T> {
550        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
551        Ok(ArrowColumnChunkReader(
552            self.data.clone().into_iter().peekable(),
553        ))
554    }
555
556    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
557        unimplemented!()
558    }
559}
560
561/// A [`Read`] for [`ArrowColumnChunkData`]
562struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
563
564impl Read for ArrowColumnChunkReader {
565    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
566        let buffer = loop {
567            match self.0.peek_mut() {
568                Some(b) if b.is_empty() => {
569                    self.0.next();
570                    continue;
571                }
572                Some(b) => break b,
573                None => return Ok(0),
574            }
575        };
576
577        let len = buffer.len().min(out.len());
578        let b = buffer.split_to(len);
579        out[..len].copy_from_slice(&b);
580        Ok(len)
581    }
582}
583
584/// A shared [`ArrowColumnChunkData`]
585///
586/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
587/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
588type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
589
590#[derive(Default)]
591struct ArrowPageWriter {
592    buffer: SharedColumnChunk,
593    #[cfg(feature = "encryption")]
594    page_encryptor: Option<PageEncryptor>,
595}
596
597impl ArrowPageWriter {
598    #[cfg(feature = "encryption")]
599    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
600        self.page_encryptor = page_encryptor;
601        self
602    }
603
604    #[cfg(feature = "encryption")]
605    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
606        self.page_encryptor.as_mut()
607    }
608
609    #[cfg(not(feature = "encryption"))]
610    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
611        None
612    }
613}
614
615impl PageWriter for ArrowPageWriter {
616    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
617        let page = match self.page_encryptor_mut() {
618            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
619            None => page,
620        };
621
622        let page_header = page.to_thrift_header()?;
623        let header = {
624            let mut header = Vec::with_capacity(1024);
625
626            match self.page_encryptor_mut() {
627                Some(page_encryptor) => {
628                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
629                    if page.compressed_page().is_data_page() {
630                        page_encryptor.increment_page();
631                    }
632                }
633                None => {
634                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
635                    page_header.write_thrift(&mut protocol)?;
636                }
637            };
638
639            Bytes::from(header)
640        };
641
642        let mut buf = self.buffer.try_lock().unwrap();
643
644        let data = page.compressed_page().buffer().clone();
645        let compressed_size = data.len() + header.len();
646
647        let mut spec = PageWriteSpec::new();
648        spec.page_type = page.page_type();
649        spec.num_values = page.num_values();
650        spec.uncompressed_size = page.uncompressed_size() + header.len();
651        spec.offset = buf.length as u64;
652        spec.compressed_size = compressed_size;
653        spec.bytes_written = compressed_size as u64;
654
655        buf.length += compressed_size;
656        buf.data.push(header);
657        buf.data.push(data);
658
659        Ok(spec)
660    }
661
662    fn close(&mut self) -> Result<()> {
663        Ok(())
664    }
665}
666
667/// A leaf column that can be encoded by [`ArrowColumnWriter`]
668#[derive(Debug)]
669pub struct ArrowLeafColumn(ArrayLevels);
670
671/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
672///
673/// This function can be used along with [`get_column_writers`] to encode
674/// individual columns in parallel. See example on [`ArrowColumnWriter`]
675pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
676    let levels = calculate_array_levels(array, field)?;
677    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
678}
679
680/// The data for a single column chunk, see [`ArrowColumnWriter`]
681pub struct ArrowColumnChunk {
682    data: ArrowColumnChunkData,
683    close: ColumnCloseResult,
684}
685
686impl std::fmt::Debug for ArrowColumnChunk {
687    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688        f.debug_struct("ArrowColumnChunk")
689            .field("length", &self.data.length)
690            .finish_non_exhaustive()
691    }
692}
693
694impl ArrowColumnChunk {
695    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
696    pub fn append_to_row_group<W: Write + Send>(
697        self,
698        writer: &mut SerializedRowGroupWriter<'_, W>,
699    ) -> Result<()> {
700        writer.append_column(&self.data, self.close)
701    }
702}
703
704/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
705///
706/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
707///
708/// Note: This is a low-level interface for applications that require
709/// fine-grained control of encoding (e.g. encoding using multiple threads),
710/// see [`ArrowWriter`] for a higher-level interface
711///
712/// # Example: Encoding two Arrow Array's in Parallel
713/// ```
714/// // The arrow schema
715/// # use std::sync::Arc;
716/// # use arrow_array::*;
717/// # use arrow_schema::*;
718/// # use parquet::arrow::ArrowSchemaConverter;
719/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
720/// # use parquet::file::properties::WriterProperties;
721/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
722/// #
723/// let schema = Arc::new(Schema::new(vec![
724///     Field::new("i32", DataType::Int32, false),
725///     Field::new("f32", DataType::Float32, false),
726/// ]));
727///
728/// // Compute the parquet schema
729/// let props = Arc::new(WriterProperties::default());
730/// let parquet_schema = ArrowSchemaConverter::new()
731///   .with_coerce_types(props.coerce_types())
732///   .convert(&schema)
733///   .unwrap();
734///
735/// // Create parquet writer
736/// let root_schema = parquet_schema.root_schema_ptr();
737/// // write to memory in the example, but this could be a File
738/// let mut out = Vec::with_capacity(1024);
739/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
740///   .unwrap();
741///
742/// // Create a factory for building Arrow column writers
743/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
744/// // Create column writers for the 0th row group
745/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
746///
747/// // Spawn a worker thread for each column
748/// //
749/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
750/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
751/// let mut workers: Vec<_> = col_writers
752///     .into_iter()
753///     .map(|mut col_writer| {
754///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
755///         let handle = std::thread::spawn(move || {
756///             // receive Arrays to encode via the channel
757///             for col in recv {
758///                 col_writer.write(&col)?;
759///             }
760///             // once the input is complete, close the writer
761///             // to return the newly created ArrowColumnChunk
762///             col_writer.close()
763///         });
764///         (handle, send)
765///     })
766///     .collect();
767///
768/// // Start row group
769/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
770///   .next_row_group()
771///   .unwrap();
772///
773/// // Create some example input columns to encode
774/// let to_write = vec![
775///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
776///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
777/// ];
778///
779/// // Send the input columns to the workers
780/// let mut worker_iter = workers.iter_mut();
781/// for (arr, field) in to_write.iter().zip(&schema.fields) {
782///     for leaves in compute_leaves(field, arr).unwrap() {
783///         worker_iter.next().unwrap().1.send(leaves).unwrap();
784///     }
785/// }
786///
787/// // Wait for the workers to complete encoding, and append
788/// // the resulting column chunks to the row group (and the file)
789/// for (handle, send) in workers {
790///     drop(send); // Drop send side to signal termination
791///     // wait for the worker to send the completed chunk
792///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
793///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
794/// }
795/// // Close the row group which writes to the underlying file
796/// row_group_writer.close().unwrap();
797///
798/// let metadata = writer.close().unwrap();
799/// assert_eq!(metadata.file_metadata().num_rows(), 3);
800/// ```
801pub struct ArrowColumnWriter {
802    writer: ArrowColumnWriterImpl,
803    chunk: SharedColumnChunk,
804}
805
806impl std::fmt::Debug for ArrowColumnWriter {
807    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
808        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
809    }
810}
811
812enum ArrowColumnWriterImpl {
813    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
814    Column(ColumnWriter<'static>),
815}
816
817impl ArrowColumnWriter {
818    /// Write an [`ArrowLeafColumn`]
819    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
820        match &mut self.writer {
821            ArrowColumnWriterImpl::Column(c) => {
822                write_leaf(c, &col.0)?;
823            }
824            ArrowColumnWriterImpl::ByteArray(c) => {
825                write_primitive(c, col.0.array().as_ref(), &col.0)?;
826            }
827        }
828        Ok(())
829    }
830
831    /// Close this column returning the written [`ArrowColumnChunk`]
832    pub fn close(self) -> Result<ArrowColumnChunk> {
833        let close = match self.writer {
834            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
835            ArrowColumnWriterImpl::Column(c) => c.close()?,
836        };
837        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
838        let data = chunk.into_inner().unwrap();
839        Ok(ArrowColumnChunk { data, close })
840    }
841
842    /// Returns the estimated total memory usage by the writer.
843    ///
844    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
845    /// of the current memory usage and not it's anticipated encoded size.
846    ///
847    /// This includes:
848    /// 1. Data buffered in encoded form
849    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
850    ///
851    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
852    pub fn memory_size(&self) -> usize {
853        match &self.writer {
854            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
855            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
856        }
857    }
858
859    /// Returns the estimated total encoded bytes for this column writer.
860    ///
861    /// This includes:
862    /// 1. Data buffered in encoded form
863    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
864    ///
865    /// This value should be less than or equal to [`Self::memory_size`]
866    pub fn get_estimated_total_bytes(&self) -> usize {
867        match &self.writer {
868            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
869            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
870        }
871    }
872}
873
874/// Encodes [`RecordBatch`] to a parquet row group
875///
876/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
877/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
878///
879/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
880#[derive(Debug)]
881struct ArrowRowGroupWriter {
882    writers: Vec<ArrowColumnWriter>,
883    schema: SchemaRef,
884    buffered_rows: usize,
885}
886
887impl ArrowRowGroupWriter {
888    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
889        Self {
890            writers,
891            schema: arrow.clone(),
892            buffered_rows: 0,
893        }
894    }
895
896    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
897        self.buffered_rows += batch.num_rows();
898        let mut writers = self.writers.iter_mut();
899        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
900            for leaf in compute_leaves(field.as_ref(), column)? {
901                writers.next().unwrap().write(&leaf)?
902            }
903        }
904        Ok(())
905    }
906
907    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
908        self.writers
909            .into_iter()
910            .map(|writer| writer.close())
911            .collect()
912    }
913}
914
915/// Factory that creates new column writers for each row group in the Parquet file.
916///
917/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
918/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
919#[derive(Debug)]
920pub struct ArrowRowGroupWriterFactory {
921    schema: SchemaDescPtr,
922    arrow_schema: SchemaRef,
923    props: WriterPropertiesPtr,
924    #[cfg(feature = "encryption")]
925    file_encryptor: Option<Arc<FileEncryptor>>,
926}
927
928impl ArrowRowGroupWriterFactory {
929    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
930    pub fn new<W: Write + Send>(
931        file_writer: &SerializedFileWriter<W>,
932        arrow_schema: SchemaRef,
933    ) -> Self {
934        let schema = Arc::clone(file_writer.schema_descr_ptr());
935        let props = Arc::clone(file_writer.properties());
936        Self {
937            schema,
938            arrow_schema,
939            props,
940            #[cfg(feature = "encryption")]
941            file_encryptor: file_writer.file_encryptor(),
942        }
943    }
944
945    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
946        let writers = self.create_column_writers(row_group_index)?;
947        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
948    }
949
950    /// Create column writers for a new row group, with the given row group index
951    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
952        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
953        let mut leaves = self.schema.columns().iter();
954        let column_factory = self.column_writer_factory(row_group_index);
955        for field in &self.arrow_schema.fields {
956            column_factory.get_arrow_column_writer(
957                field.data_type(),
958                &self.props,
959                &mut leaves,
960                &mut writers,
961            )?;
962        }
963        Ok(writers)
964    }
965
966    #[cfg(feature = "encryption")]
967    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
968        ArrowColumnWriterFactory::new()
969            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
970    }
971
972    #[cfg(not(feature = "encryption"))]
973    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
974        ArrowColumnWriterFactory::new()
975    }
976}
977
978/// Returns [`ArrowColumnWriter`]s for each column in a given schema
979#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
980pub fn get_column_writers(
981    parquet: &SchemaDescriptor,
982    props: &WriterPropertiesPtr,
983    arrow: &SchemaRef,
984) -> Result<Vec<ArrowColumnWriter>> {
985    let mut writers = Vec::with_capacity(arrow.fields.len());
986    let mut leaves = parquet.columns().iter();
987    let column_factory = ArrowColumnWriterFactory::new();
988    for field in &arrow.fields {
989        column_factory.get_arrow_column_writer(
990            field.data_type(),
991            props,
992            &mut leaves,
993            &mut writers,
994        )?;
995    }
996    Ok(writers)
997}
998
999/// Creates [`ArrowColumnWriter`] instances
1000struct ArrowColumnWriterFactory {
1001    #[cfg(feature = "encryption")]
1002    row_group_index: usize,
1003    #[cfg(feature = "encryption")]
1004    file_encryptor: Option<Arc<FileEncryptor>>,
1005}
1006
1007impl ArrowColumnWriterFactory {
1008    pub fn new() -> Self {
1009        Self {
1010            #[cfg(feature = "encryption")]
1011            row_group_index: 0,
1012            #[cfg(feature = "encryption")]
1013            file_encryptor: None,
1014        }
1015    }
1016
1017    #[cfg(feature = "encryption")]
1018    pub fn with_file_encryptor(
1019        mut self,
1020        row_group_index: usize,
1021        file_encryptor: Option<Arc<FileEncryptor>>,
1022    ) -> Self {
1023        self.row_group_index = row_group_index;
1024        self.file_encryptor = file_encryptor;
1025        self
1026    }
1027
1028    #[cfg(feature = "encryption")]
1029    fn create_page_writer(
1030        &self,
1031        column_descriptor: &ColumnDescPtr,
1032        column_index: usize,
1033    ) -> Result<Box<ArrowPageWriter>> {
1034        let column_path = column_descriptor.path().string();
1035        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1036            &self.file_encryptor,
1037            self.row_group_index,
1038            column_index,
1039            &column_path,
1040        )?;
1041        Ok(Box::new(
1042            ArrowPageWriter::default().with_encryptor(page_encryptor),
1043        ))
1044    }
1045
1046    #[cfg(not(feature = "encryption"))]
1047    fn create_page_writer(
1048        &self,
1049        _column_descriptor: &ColumnDescPtr,
1050        _column_index: usize,
1051    ) -> Result<Box<ArrowPageWriter>> {
1052        Ok(Box::<ArrowPageWriter>::default())
1053    }
1054
1055    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1056    /// output ColumnDesc to `leaves` and the column writers to `out`
1057    fn get_arrow_column_writer(
1058        &self,
1059        data_type: &ArrowDataType,
1060        props: &WriterPropertiesPtr,
1061        leaves: &mut Iter<'_, ColumnDescPtr>,
1062        out: &mut Vec<ArrowColumnWriter>,
1063    ) -> Result<()> {
1064        // Instantiate writers for normal columns
1065        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1066            let page_writer = self.create_page_writer(desc, out.len())?;
1067            let chunk = page_writer.buffer.clone();
1068            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1069            Ok(ArrowColumnWriter {
1070                chunk,
1071                writer: ArrowColumnWriterImpl::Column(writer),
1072            })
1073        };
1074
1075        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1076        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1077            let page_writer = self.create_page_writer(desc, out.len())?;
1078            let chunk = page_writer.buffer.clone();
1079            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1080            Ok(ArrowColumnWriter {
1081                chunk,
1082                writer: ArrowColumnWriterImpl::ByteArray(writer),
1083            })
1084        };
1085
1086        match data_type {
1087            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1088            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1089                out.push(col(leaves.next().unwrap())?)
1090            }
1091            ArrowDataType::LargeBinary
1092            | ArrowDataType::Binary
1093            | ArrowDataType::Utf8
1094            | ArrowDataType::LargeUtf8
1095            | ArrowDataType::BinaryView
1096            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1097            ArrowDataType::List(f)
1098            | ArrowDataType::LargeList(f)
1099            | ArrowDataType::FixedSizeList(f, _) => {
1100                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1101            }
1102            ArrowDataType::Struct(fields) => {
1103                for field in fields {
1104                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1105                }
1106            }
1107            ArrowDataType::Map(f, _) => match f.data_type() {
1108                ArrowDataType::Struct(f) => {
1109                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1110                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1111                }
1112                _ => unreachable!("invalid map type"),
1113            },
1114            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1115                ArrowDataType::Utf8
1116                | ArrowDataType::LargeUtf8
1117                | ArrowDataType::Binary
1118                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1119                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1120                    out.push(bytes(leaves.next().unwrap())?)
1121                }
1122                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1123                _ => out.push(col(leaves.next().unwrap())?),
1124            },
1125            _ => {
1126                return Err(ParquetError::NYI(format!(
1127                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1128                )));
1129            }
1130        }
1131        Ok(())
1132    }
1133}
1134
1135fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1136    let column = levels.array().as_ref();
1137    let indices = levels.non_null_indices();
1138    match writer {
1139        ColumnWriter::Int32ColumnWriter(typed) => {
1140            match column.data_type() {
1141                ArrowDataType::Date64 => {
1142                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1143                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1144                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1145
1146                    let array = array.as_primitive::<Int32Type>();
1147                    write_primitive(typed, array.values(), levels)
1148                }
1149                ArrowDataType::UInt32 => {
1150                    let values = column.as_primitive::<UInt32Type>().values();
1151                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1152                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1153                    let array = values.inner().typed_data::<i32>();
1154                    write_primitive(typed, array, levels)
1155                }
1156                ArrowDataType::Decimal32(_, _) => {
1157                    let array = column
1158                        .as_primitive::<Decimal32Type>()
1159                        .unary::<_, Int32Type>(|v| v);
1160                    write_primitive(typed, array.values(), levels)
1161                }
1162                ArrowDataType::Decimal64(_, _) => {
1163                    // use the int32 to represent the decimal with low precision
1164                    let array = column
1165                        .as_primitive::<Decimal64Type>()
1166                        .unary::<_, Int32Type>(|v| v as i32);
1167                    write_primitive(typed, array.values(), levels)
1168                }
1169                ArrowDataType::Decimal128(_, _) => {
1170                    // use the int32 to represent the decimal with low precision
1171                    let array = column
1172                        .as_primitive::<Decimal128Type>()
1173                        .unary::<_, Int32Type>(|v| v as i32);
1174                    write_primitive(typed, array.values(), levels)
1175                }
1176                ArrowDataType::Decimal256(_, _) => {
1177                    // use the int32 to represent the decimal with low precision
1178                    let array = column
1179                        .as_primitive::<Decimal256Type>()
1180                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1181                    write_primitive(typed, array.values(), levels)
1182                }
1183                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1184                    ArrowDataType::Decimal32(_, _) => {
1185                        let array = arrow_cast::cast(column, value_type)?;
1186                        let array = array
1187                            .as_primitive::<Decimal32Type>()
1188                            .unary::<_, Int32Type>(|v| v);
1189                        write_primitive(typed, array.values(), levels)
1190                    }
1191                    ArrowDataType::Decimal64(_, _) => {
1192                        let array = arrow_cast::cast(column, value_type)?;
1193                        let array = array
1194                            .as_primitive::<Decimal64Type>()
1195                            .unary::<_, Int32Type>(|v| v as i32);
1196                        write_primitive(typed, array.values(), levels)
1197                    }
1198                    ArrowDataType::Decimal128(_, _) => {
1199                        let array = arrow_cast::cast(column, value_type)?;
1200                        let array = array
1201                            .as_primitive::<Decimal128Type>()
1202                            .unary::<_, Int32Type>(|v| v as i32);
1203                        write_primitive(typed, array.values(), levels)
1204                    }
1205                    ArrowDataType::Decimal256(_, _) => {
1206                        let array = arrow_cast::cast(column, value_type)?;
1207                        let array = array
1208                            .as_primitive::<Decimal256Type>()
1209                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1210                        write_primitive(typed, array.values(), levels)
1211                    }
1212                    _ => {
1213                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1214                        let array = array.as_primitive::<Int32Type>();
1215                        write_primitive(typed, array.values(), levels)
1216                    }
1217                },
1218                _ => {
1219                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1220                    let array = array.as_primitive::<Int32Type>();
1221                    write_primitive(typed, array.values(), levels)
1222                }
1223            }
1224        }
1225        ColumnWriter::BoolColumnWriter(typed) => {
1226            let array = column.as_boolean();
1227            typed.write_batch(
1228                get_bool_array_slice(array, indices).as_slice(),
1229                levels.def_levels(),
1230                levels.rep_levels(),
1231            )
1232        }
1233        ColumnWriter::Int64ColumnWriter(typed) => {
1234            match column.data_type() {
1235                ArrowDataType::Date64 => {
1236                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1237
1238                    let array = array.as_primitive::<Int64Type>();
1239                    write_primitive(typed, array.values(), levels)
1240                }
1241                ArrowDataType::Int64 => {
1242                    let array = column.as_primitive::<Int64Type>();
1243                    write_primitive(typed, array.values(), levels)
1244                }
1245                ArrowDataType::UInt64 => {
1246                    let values = column.as_primitive::<UInt64Type>().values();
1247                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1248                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1249                    let array = values.inner().typed_data::<i64>();
1250                    write_primitive(typed, array, levels)
1251                }
1252                ArrowDataType::Decimal64(_, _) => {
1253                    let array = column
1254                        .as_primitive::<Decimal64Type>()
1255                        .unary::<_, Int64Type>(|v| v);
1256                    write_primitive(typed, array.values(), levels)
1257                }
1258                ArrowDataType::Decimal128(_, _) => {
1259                    // use the int64 to represent the decimal with low precision
1260                    let array = column
1261                        .as_primitive::<Decimal128Type>()
1262                        .unary::<_, Int64Type>(|v| v as i64);
1263                    write_primitive(typed, array.values(), levels)
1264                }
1265                ArrowDataType::Decimal256(_, _) => {
1266                    // use the int64 to represent the decimal with low precision
1267                    let array = column
1268                        .as_primitive::<Decimal256Type>()
1269                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1270                    write_primitive(typed, array.values(), levels)
1271                }
1272                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1273                    ArrowDataType::Decimal64(_, _) => {
1274                        let array = arrow_cast::cast(column, value_type)?;
1275                        let array = array
1276                            .as_primitive::<Decimal64Type>()
1277                            .unary::<_, Int64Type>(|v| v);
1278                        write_primitive(typed, array.values(), levels)
1279                    }
1280                    ArrowDataType::Decimal128(_, _) => {
1281                        let array = arrow_cast::cast(column, value_type)?;
1282                        let array = array
1283                            .as_primitive::<Decimal128Type>()
1284                            .unary::<_, Int64Type>(|v| v as i64);
1285                        write_primitive(typed, array.values(), levels)
1286                    }
1287                    ArrowDataType::Decimal256(_, _) => {
1288                        let array = arrow_cast::cast(column, value_type)?;
1289                        let array = array
1290                            .as_primitive::<Decimal256Type>()
1291                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1292                        write_primitive(typed, array.values(), levels)
1293                    }
1294                    _ => {
1295                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1296                        let array = array.as_primitive::<Int64Type>();
1297                        write_primitive(typed, array.values(), levels)
1298                    }
1299                },
1300                _ => {
1301                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1302                    let array = array.as_primitive::<Int64Type>();
1303                    write_primitive(typed, array.values(), levels)
1304                }
1305            }
1306        }
1307        ColumnWriter::Int96ColumnWriter(_typed) => {
1308            unreachable!("Currently unreachable because data type not supported")
1309        }
1310        ColumnWriter::FloatColumnWriter(typed) => {
1311            let array = column.as_primitive::<Float32Type>();
1312            write_primitive(typed, array.values(), levels)
1313        }
1314        ColumnWriter::DoubleColumnWriter(typed) => {
1315            let array = column.as_primitive::<Float64Type>();
1316            write_primitive(typed, array.values(), levels)
1317        }
1318        ColumnWriter::ByteArrayColumnWriter(_) => {
1319            unreachable!("should use ByteArrayWriter")
1320        }
1321        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1322            let bytes = match column.data_type() {
1323                ArrowDataType::Interval(interval_unit) => match interval_unit {
1324                    IntervalUnit::YearMonth => {
1325                        let array = column
1326                            .as_any()
1327                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1328                            .unwrap();
1329                        get_interval_ym_array_slice(array, indices)
1330                    }
1331                    IntervalUnit::DayTime => {
1332                        let array = column
1333                            .as_any()
1334                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1335                            .unwrap();
1336                        get_interval_dt_array_slice(array, indices)
1337                    }
1338                    _ => {
1339                        return Err(ParquetError::NYI(format!(
1340                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1341                        )));
1342                    }
1343                },
1344                ArrowDataType::FixedSizeBinary(_) => {
1345                    let array = column
1346                        .as_any()
1347                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1348                        .unwrap();
1349                    get_fsb_array_slice(array, indices)
1350                }
1351                ArrowDataType::Decimal32(_, _) => {
1352                    let array = column.as_primitive::<Decimal32Type>();
1353                    get_decimal_32_array_slice(array, indices)
1354                }
1355                ArrowDataType::Decimal64(_, _) => {
1356                    let array = column.as_primitive::<Decimal64Type>();
1357                    get_decimal_64_array_slice(array, indices)
1358                }
1359                ArrowDataType::Decimal128(_, _) => {
1360                    let array = column.as_primitive::<Decimal128Type>();
1361                    get_decimal_128_array_slice(array, indices)
1362                }
1363                ArrowDataType::Decimal256(_, _) => {
1364                    let array = column
1365                        .as_any()
1366                        .downcast_ref::<arrow_array::Decimal256Array>()
1367                        .unwrap();
1368                    get_decimal_256_array_slice(array, indices)
1369                }
1370                ArrowDataType::Float16 => {
1371                    let array = column.as_primitive::<Float16Type>();
1372                    get_float_16_array_slice(array, indices)
1373                }
1374                _ => {
1375                    return Err(ParquetError::NYI(
1376                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1377                    ));
1378                }
1379            };
1380            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1381        }
1382    }
1383}
1384
1385fn write_primitive<E: ColumnValueEncoder>(
1386    writer: &mut GenericColumnWriter<E>,
1387    values: &E::Values,
1388    levels: &ArrayLevels,
1389) -> Result<usize> {
1390    writer.write_batch_internal(
1391        values,
1392        Some(levels.non_null_indices()),
1393        levels.def_levels(),
1394        levels.rep_levels(),
1395        None,
1396        None,
1397        None,
1398    )
1399}
1400
1401fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1402    let mut values = Vec::with_capacity(indices.len());
1403    for i in indices {
1404        values.push(array.value(*i))
1405    }
1406    values
1407}
1408
1409/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1410/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1411fn get_interval_ym_array_slice(
1412    array: &arrow_array::IntervalYearMonthArray,
1413    indices: &[usize],
1414) -> Vec<FixedLenByteArray> {
1415    let mut values = Vec::with_capacity(indices.len());
1416    for i in indices {
1417        let mut value = array.value(*i).to_le_bytes().to_vec();
1418        let mut suffix = vec![0; 8];
1419        value.append(&mut suffix);
1420        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1421    }
1422    values
1423}
1424
1425/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1426/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1427fn get_interval_dt_array_slice(
1428    array: &arrow_array::IntervalDayTimeArray,
1429    indices: &[usize],
1430) -> Vec<FixedLenByteArray> {
1431    let mut values = Vec::with_capacity(indices.len());
1432    for i in indices {
1433        let mut out = [0; 12];
1434        let value = array.value(*i);
1435        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1436        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1437        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1438    }
1439    values
1440}
1441
1442fn get_decimal_32_array_slice(
1443    array: &arrow_array::Decimal32Array,
1444    indices: &[usize],
1445) -> Vec<FixedLenByteArray> {
1446    let mut values = Vec::with_capacity(indices.len());
1447    let size = decimal_length_from_precision(array.precision());
1448    for i in indices {
1449        let as_be_bytes = array.value(*i).to_be_bytes();
1450        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1451        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1452    }
1453    values
1454}
1455
1456fn get_decimal_64_array_slice(
1457    array: &arrow_array::Decimal64Array,
1458    indices: &[usize],
1459) -> Vec<FixedLenByteArray> {
1460    let mut values = Vec::with_capacity(indices.len());
1461    let size = decimal_length_from_precision(array.precision());
1462    for i in indices {
1463        let as_be_bytes = array.value(*i).to_be_bytes();
1464        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1465        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1466    }
1467    values
1468}
1469
1470fn get_decimal_128_array_slice(
1471    array: &arrow_array::Decimal128Array,
1472    indices: &[usize],
1473) -> Vec<FixedLenByteArray> {
1474    let mut values = Vec::with_capacity(indices.len());
1475    let size = decimal_length_from_precision(array.precision());
1476    for i in indices {
1477        let as_be_bytes = array.value(*i).to_be_bytes();
1478        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1479        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1480    }
1481    values
1482}
1483
1484fn get_decimal_256_array_slice(
1485    array: &arrow_array::Decimal256Array,
1486    indices: &[usize],
1487) -> Vec<FixedLenByteArray> {
1488    let mut values = Vec::with_capacity(indices.len());
1489    let size = decimal_length_from_precision(array.precision());
1490    for i in indices {
1491        let as_be_bytes = array.value(*i).to_be_bytes();
1492        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1493        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1494    }
1495    values
1496}
1497
1498fn get_float_16_array_slice(
1499    array: &arrow_array::Float16Array,
1500    indices: &[usize],
1501) -> Vec<FixedLenByteArray> {
1502    let mut values = Vec::with_capacity(indices.len());
1503    for i in indices {
1504        let value = array.value(*i).to_le_bytes().to_vec();
1505        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1506    }
1507    values
1508}
1509
1510fn get_fsb_array_slice(
1511    array: &arrow_array::FixedSizeBinaryArray,
1512    indices: &[usize],
1513) -> Vec<FixedLenByteArray> {
1514    let mut values = Vec::with_capacity(indices.len());
1515    for i in indices {
1516        let value = array.value(*i).to_vec();
1517        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1518    }
1519    values
1520}
1521
1522#[cfg(test)]
1523mod tests {
1524    use super::*;
1525
1526    use std::fs::File;
1527
1528    use crate::arrow::ARROW_SCHEMA_META_KEY;
1529    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1530    use crate::column::page::{Page, PageReader};
1531    use crate::file::metadata::thrift::PageHeader;
1532    use crate::file::page_index::column_index::ColumnIndexMetaData;
1533    use crate::file::reader::SerializedPageReader;
1534    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1535    use crate::schema::types::{ColumnPath, Type};
1536    use arrow::datatypes::ToByteSlice;
1537    use arrow::datatypes::{DataType, Schema};
1538    use arrow::error::Result as ArrowResult;
1539    use arrow::util::data_gen::create_random_array;
1540    use arrow::util::pretty::pretty_format_batches;
1541    use arrow::{array::*, buffer::Buffer};
1542    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, i256};
1543    use arrow_schema::Fields;
1544    use half::f16;
1545    use num_traits::{FromPrimitive, ToPrimitive};
1546    use tempfile::tempfile;
1547
1548    use crate::basic::Encoding;
1549    use crate::data_type::AsBytes;
1550    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1551    use crate::file::properties::{
1552        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1553    };
1554    use crate::file::serialized_reader::ReadOptionsBuilder;
1555    use crate::file::{
1556        reader::{FileReader, SerializedFileReader},
1557        statistics::Statistics,
1558    };
1559
1560    #[test]
1561    fn arrow_writer() {
1562        // define schema
1563        let schema = Schema::new(vec![
1564            Field::new("a", DataType::Int32, false),
1565            Field::new("b", DataType::Int32, true),
1566        ]);
1567
1568        // create some data
1569        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1570        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1571
1572        // build a record batch
1573        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1574
1575        roundtrip(batch, Some(SMALL_SIZE / 2));
1576    }
1577
1578    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1579        let mut buffer = vec![];
1580
1581        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1582        writer.write(expected_batch).unwrap();
1583        writer.close().unwrap();
1584
1585        buffer
1586    }
1587
1588    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1589        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1590        writer.write(expected_batch).unwrap();
1591        writer.into_inner().unwrap()
1592    }
1593
1594    #[test]
1595    fn roundtrip_bytes() {
1596        // define schema
1597        let schema = Arc::new(Schema::new(vec![
1598            Field::new("a", DataType::Int32, false),
1599            Field::new("b", DataType::Int32, true),
1600        ]));
1601
1602        // create some data
1603        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1604        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1605
1606        // build a record batch
1607        let expected_batch =
1608            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1609
1610        for buffer in [
1611            get_bytes_after_close(schema.clone(), &expected_batch),
1612            get_bytes_by_into_inner(schema, &expected_batch),
1613        ] {
1614            let cursor = Bytes::from(buffer);
1615            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1616
1617            let actual_batch = record_batch_reader
1618                .next()
1619                .expect("No batch found")
1620                .expect("Unable to get batch");
1621
1622            assert_eq!(expected_batch.schema(), actual_batch.schema());
1623            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1624            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1625            for i in 0..expected_batch.num_columns() {
1626                let expected_data = expected_batch.column(i).to_data();
1627                let actual_data = actual_batch.column(i).to_data();
1628
1629                assert_eq!(expected_data, actual_data);
1630            }
1631        }
1632    }
1633
1634    #[test]
1635    fn arrow_writer_non_null() {
1636        // define schema
1637        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1638
1639        // create some data
1640        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1641
1642        // build a record batch
1643        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1644
1645        roundtrip(batch, Some(SMALL_SIZE / 2));
1646    }
1647
1648    #[test]
1649    fn arrow_writer_list() {
1650        // define schema
1651        let schema = Schema::new(vec![Field::new(
1652            "a",
1653            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1654            true,
1655        )]);
1656
1657        // create some data
1658        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1659
1660        // Construct a buffer for value offsets, for the nested array:
1661        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1662        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1663
1664        // Construct a list array from the above two
1665        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1666            DataType::Int32,
1667            false,
1668        ))))
1669        .len(5)
1670        .add_buffer(a_value_offsets)
1671        .add_child_data(a_values.into_data())
1672        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1673        .build()
1674        .unwrap();
1675        let a = ListArray::from(a_list_data);
1676
1677        // build a record batch
1678        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1679
1680        assert_eq!(batch.column(0).null_count(), 1);
1681
1682        // This test fails if the max row group size is less than the batch's length
1683        // see https://github.com/apache/arrow-rs/issues/518
1684        roundtrip(batch, None);
1685    }
1686
1687    #[test]
1688    fn arrow_writer_list_non_null() {
1689        // define schema
1690        let schema = Schema::new(vec![Field::new(
1691            "a",
1692            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1693            false,
1694        )]);
1695
1696        // create some data
1697        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1698
1699        // Construct a buffer for value offsets, for the nested array:
1700        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1701        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1702
1703        // Construct a list array from the above two
1704        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1705            DataType::Int32,
1706            false,
1707        ))))
1708        .len(5)
1709        .add_buffer(a_value_offsets)
1710        .add_child_data(a_values.into_data())
1711        .build()
1712        .unwrap();
1713        let a = ListArray::from(a_list_data);
1714
1715        // build a record batch
1716        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1717
1718        // This test fails if the max row group size is less than the batch's length
1719        // see https://github.com/apache/arrow-rs/issues/518
1720        assert_eq!(batch.column(0).null_count(), 0);
1721
1722        roundtrip(batch, None);
1723    }
1724
1725    #[test]
1726    fn arrow_writer_binary() {
1727        let string_field = Field::new("a", DataType::Utf8, false);
1728        let binary_field = Field::new("b", DataType::Binary, false);
1729        let schema = Schema::new(vec![string_field, binary_field]);
1730
1731        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1732        let raw_binary_values = [
1733            b"foo".to_vec(),
1734            b"bar".to_vec(),
1735            b"baz".to_vec(),
1736            b"quux".to_vec(),
1737        ];
1738        let raw_binary_value_refs = raw_binary_values
1739            .iter()
1740            .map(|x| x.as_slice())
1741            .collect::<Vec<_>>();
1742
1743        let string_values = StringArray::from(raw_string_values.clone());
1744        let binary_values = BinaryArray::from(raw_binary_value_refs);
1745        let batch = RecordBatch::try_new(
1746            Arc::new(schema),
1747            vec![Arc::new(string_values), Arc::new(binary_values)],
1748        )
1749        .unwrap();
1750
1751        roundtrip(batch, Some(SMALL_SIZE / 2));
1752    }
1753
1754    #[test]
1755    fn arrow_writer_binary_view() {
1756        let string_field = Field::new("a", DataType::Utf8View, false);
1757        let binary_field = Field::new("b", DataType::BinaryView, false);
1758        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1759        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1760
1761        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1762        let raw_binary_values = vec![
1763            b"foo".to_vec(),
1764            b"bar".to_vec(),
1765            b"large payload over 12 bytes".to_vec(),
1766            b"lulu".to_vec(),
1767        ];
1768        let nullable_string_values =
1769            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1770
1771        let string_view_values = StringViewArray::from(raw_string_values);
1772        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1773        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1774        let batch = RecordBatch::try_new(
1775            Arc::new(schema),
1776            vec![
1777                Arc::new(string_view_values),
1778                Arc::new(binary_view_values),
1779                Arc::new(nullable_string_view_values),
1780            ],
1781        )
1782        .unwrap();
1783
1784        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1785        roundtrip(batch, None);
1786    }
1787
1788    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1789        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1790        let schema = Schema::new(vec![decimal_field]);
1791
1792        let decimal_values = vec![10_000, 50_000, 0, -100]
1793            .into_iter()
1794            .map(Some)
1795            .collect::<Decimal128Array>()
1796            .with_precision_and_scale(precision, scale)
1797            .unwrap();
1798
1799        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1800    }
1801
1802    #[test]
1803    fn arrow_writer_decimal() {
1804        // int32 to store the decimal value
1805        let batch_int32_decimal = get_decimal_batch(5, 2);
1806        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1807        // int64 to store the decimal value
1808        let batch_int64_decimal = get_decimal_batch(12, 2);
1809        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1810        // fixed_length_byte_array to store the decimal value
1811        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1812        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1813    }
1814
1815    #[test]
1816    fn arrow_writer_complex() {
1817        // define schema
1818        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1819        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1820        let struct_field_g = Arc::new(Field::new_list(
1821            "g",
1822            Field::new_list_field(DataType::Int16, true),
1823            false,
1824        ));
1825        let struct_field_h = Arc::new(Field::new_list(
1826            "h",
1827            Field::new_list_field(DataType::Int16, false),
1828            true,
1829        ));
1830        let struct_field_e = Arc::new(Field::new_struct(
1831            "e",
1832            vec![
1833                struct_field_f.clone(),
1834                struct_field_g.clone(),
1835                struct_field_h.clone(),
1836            ],
1837            false,
1838        ));
1839        let schema = Schema::new(vec![
1840            Field::new("a", DataType::Int32, false),
1841            Field::new("b", DataType::Int32, true),
1842            Field::new_struct(
1843                "c",
1844                vec![struct_field_d.clone(), struct_field_e.clone()],
1845                false,
1846            ),
1847        ]);
1848
1849        // create some data
1850        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1851        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1852        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1853        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1854
1855        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1856
1857        // Construct a buffer for value offsets, for the nested array:
1858        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1859        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1860
1861        // Construct a list array from the above two
1862        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1863            .len(5)
1864            .add_buffer(g_value_offsets.clone())
1865            .add_child_data(g_value.to_data())
1866            .build()
1867            .unwrap();
1868        let g = ListArray::from(g_list_data);
1869        // The difference between g and h is that h has a null bitmap
1870        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1871            .len(5)
1872            .add_buffer(g_value_offsets)
1873            .add_child_data(g_value.to_data())
1874            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1875            .build()
1876            .unwrap();
1877        let h = ListArray::from(h_list_data);
1878
1879        let e = StructArray::from(vec![
1880            (struct_field_f, Arc::new(f) as ArrayRef),
1881            (struct_field_g, Arc::new(g) as ArrayRef),
1882            (struct_field_h, Arc::new(h) as ArrayRef),
1883        ]);
1884
1885        let c = StructArray::from(vec![
1886            (struct_field_d, Arc::new(d) as ArrayRef),
1887            (struct_field_e, Arc::new(e) as ArrayRef),
1888        ]);
1889
1890        // build a record batch
1891        let batch = RecordBatch::try_new(
1892            Arc::new(schema),
1893            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1894        )
1895        .unwrap();
1896
1897        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1898        roundtrip(batch, Some(SMALL_SIZE / 3));
1899    }
1900
1901    #[test]
1902    fn arrow_writer_complex_mixed() {
1903        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1904        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1905
1906        // define schema
1907        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1908        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1909        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1910        let schema = Schema::new(vec![Field::new(
1911            "some_nested_object",
1912            DataType::Struct(Fields::from(vec![
1913                offset_field.clone(),
1914                partition_field.clone(),
1915                topic_field.clone(),
1916            ])),
1917            false,
1918        )]);
1919
1920        // create some data
1921        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1922        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1923        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1924
1925        let some_nested_object = StructArray::from(vec![
1926            (offset_field, Arc::new(offset) as ArrayRef),
1927            (partition_field, Arc::new(partition) as ArrayRef),
1928            (topic_field, Arc::new(topic) as ArrayRef),
1929        ]);
1930
1931        // build a record batch
1932        let batch =
1933            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1934
1935        roundtrip(batch, Some(SMALL_SIZE / 2));
1936    }
1937
1938    #[test]
1939    fn arrow_writer_map() {
1940        // Note: we are using the JSON Arrow reader for brevity
1941        let json_content = r#"
1942        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1943        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1944        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1945        "#;
1946        let entries_struct_type = DataType::Struct(Fields::from(vec![
1947            Field::new("key", DataType::Utf8, false),
1948            Field::new("value", DataType::Utf8, true),
1949        ]));
1950        let stocks_field = Field::new(
1951            "stocks",
1952            DataType::Map(
1953                Arc::new(Field::new("entries", entries_struct_type, false)),
1954                false,
1955            ),
1956            true,
1957        );
1958        let schema = Arc::new(Schema::new(vec![stocks_field]));
1959        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1960        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1961
1962        let batch = reader.next().unwrap().unwrap();
1963        roundtrip(batch, None);
1964    }
1965
1966    #[test]
1967    fn arrow_writer_2_level_struct() {
1968        // tests writing <struct<struct<primitive>>
1969        let field_c = Field::new("c", DataType::Int32, true);
1970        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1971        let type_a = DataType::Struct(vec![field_b.clone()].into());
1972        let field_a = Field::new("a", type_a, true);
1973        let schema = Schema::new(vec![field_a.clone()]);
1974
1975        // create data
1976        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1977        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1978            .len(6)
1979            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1980            .add_child_data(c.into_data())
1981            .build()
1982            .unwrap();
1983        let b = StructArray::from(b_data);
1984        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1985            .len(6)
1986            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1987            .add_child_data(b.into_data())
1988            .build()
1989            .unwrap();
1990        let a = StructArray::from(a_data);
1991
1992        assert_eq!(a.null_count(), 1);
1993        assert_eq!(a.column(0).null_count(), 2);
1994
1995        // build a racord batch
1996        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1997
1998        roundtrip(batch, Some(SMALL_SIZE / 2));
1999    }
2000
2001    #[test]
2002    fn arrow_writer_2_level_struct_non_null() {
2003        // tests writing <struct<struct<primitive>>
2004        let field_c = Field::new("c", DataType::Int32, false);
2005        let type_b = DataType::Struct(vec![field_c].into());
2006        let field_b = Field::new("b", type_b.clone(), false);
2007        let type_a = DataType::Struct(vec![field_b].into());
2008        let field_a = Field::new("a", type_a.clone(), false);
2009        let schema = Schema::new(vec![field_a]);
2010
2011        // create data
2012        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2013        let b_data = ArrayDataBuilder::new(type_b)
2014            .len(6)
2015            .add_child_data(c.into_data())
2016            .build()
2017            .unwrap();
2018        let b = StructArray::from(b_data);
2019        let a_data = ArrayDataBuilder::new(type_a)
2020            .len(6)
2021            .add_child_data(b.into_data())
2022            .build()
2023            .unwrap();
2024        let a = StructArray::from(a_data);
2025
2026        assert_eq!(a.null_count(), 0);
2027        assert_eq!(a.column(0).null_count(), 0);
2028
2029        // build a racord batch
2030        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2031
2032        roundtrip(batch, Some(SMALL_SIZE / 2));
2033    }
2034
2035    #[test]
2036    fn arrow_writer_2_level_struct_mixed_null() {
2037        // tests writing <struct<struct<primitive>>
2038        let field_c = Field::new("c", DataType::Int32, false);
2039        let type_b = DataType::Struct(vec![field_c].into());
2040        let field_b = Field::new("b", type_b.clone(), true);
2041        let type_a = DataType::Struct(vec![field_b].into());
2042        let field_a = Field::new("a", type_a.clone(), false);
2043        let schema = Schema::new(vec![field_a]);
2044
2045        // create data
2046        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2047        let b_data = ArrayDataBuilder::new(type_b)
2048            .len(6)
2049            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2050            .add_child_data(c.into_data())
2051            .build()
2052            .unwrap();
2053        let b = StructArray::from(b_data);
2054        // a intentionally has no null buffer, to test that this is handled correctly
2055        let a_data = ArrayDataBuilder::new(type_a)
2056            .len(6)
2057            .add_child_data(b.into_data())
2058            .build()
2059            .unwrap();
2060        let a = StructArray::from(a_data);
2061
2062        assert_eq!(a.null_count(), 0);
2063        assert_eq!(a.column(0).null_count(), 2);
2064
2065        // build a racord batch
2066        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2067
2068        roundtrip(batch, Some(SMALL_SIZE / 2));
2069    }
2070
2071    #[test]
2072    fn arrow_writer_2_level_struct_mixed_null_2() {
2073        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2074        let field_c = Field::new("c", DataType::Int32, false);
2075        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2076        let field_e = Field::new(
2077            "e",
2078            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2079            false,
2080        );
2081
2082        let field_b = Field::new(
2083            "b",
2084            DataType::Struct(vec![field_c, field_d, field_e].into()),
2085            false,
2086        );
2087        let type_a = DataType::Struct(vec![field_b.clone()].into());
2088        let field_a = Field::new("a", type_a, true);
2089        let schema = Schema::new(vec![field_a.clone()]);
2090
2091        // create data
2092        let c = Int32Array::from_iter_values(0..6);
2093        let d = FixedSizeBinaryArray::try_from_iter(
2094            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2095        )
2096        .expect("four byte values");
2097        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2098        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2099            .len(6)
2100            .add_child_data(c.into_data())
2101            .add_child_data(d.into_data())
2102            .add_child_data(e.into_data())
2103            .build()
2104            .unwrap();
2105        let b = StructArray::from(b_data);
2106        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2107            .len(6)
2108            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2109            .add_child_data(b.into_data())
2110            .build()
2111            .unwrap();
2112        let a = StructArray::from(a_data);
2113
2114        assert_eq!(a.null_count(), 3);
2115        assert_eq!(a.column(0).null_count(), 0);
2116
2117        // build a record batch
2118        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2119
2120        roundtrip(batch, Some(SMALL_SIZE / 2));
2121    }
2122
2123    #[test]
2124    fn test_fixed_size_binary_in_dict() {
2125        fn test_fixed_size_binary_in_dict_inner<K>()
2126        where
2127            K: ArrowDictionaryKeyType,
2128            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2129            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2130        {
2131            let field = Field::new(
2132                "a",
2133                DataType::Dictionary(
2134                    Box::new(K::DATA_TYPE),
2135                    Box::new(DataType::FixedSizeBinary(4)),
2136                ),
2137                false,
2138            );
2139            let schema = Schema::new(vec![field]);
2140
2141            let keys: Vec<K::Native> = vec![
2142                K::Native::try_from(0u8).unwrap(),
2143                K::Native::try_from(0u8).unwrap(),
2144                K::Native::try_from(1u8).unwrap(),
2145            ];
2146            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2147            let values = FixedSizeBinaryArray::try_from_iter(
2148                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2149            )
2150            .unwrap();
2151
2152            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2153            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2154            roundtrip(batch, None);
2155        }
2156
2157        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2158        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2159        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2160        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2161        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2162        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2163        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2164        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2165    }
2166
2167    #[test]
2168    fn test_empty_dict() {
2169        let struct_fields = Fields::from(vec![Field::new(
2170            "dict",
2171            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2172            false,
2173        )]);
2174
2175        let schema = Schema::new(vec![Field::new_struct(
2176            "struct",
2177            struct_fields.clone(),
2178            true,
2179        )]);
2180        let dictionary = Arc::new(DictionaryArray::new(
2181            Int32Array::new_null(5),
2182            Arc::new(StringArray::new_null(0)),
2183        ));
2184
2185        let s = StructArray::new(
2186            struct_fields,
2187            vec![dictionary],
2188            Some(NullBuffer::new_null(5)),
2189        );
2190
2191        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2192        roundtrip(batch, None);
2193    }
2194    #[test]
2195    fn arrow_writer_page_size() {
2196        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2197
2198        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2199
2200        // Generate an array of 10 unique 10 character string
2201        for i in 0..10 {
2202            let value = i
2203                .to_string()
2204                .repeat(10)
2205                .chars()
2206                .take(10)
2207                .collect::<String>();
2208
2209            builder.append_value(value);
2210        }
2211
2212        let array = Arc::new(builder.finish());
2213
2214        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2215
2216        let file = tempfile::tempfile().unwrap();
2217
2218        // Set everything very low so we fallback to PLAIN encoding after the first row
2219        let props = WriterProperties::builder()
2220            .set_data_page_size_limit(1)
2221            .set_dictionary_page_size_limit(1)
2222            .set_write_batch_size(1)
2223            .build();
2224
2225        let mut writer =
2226            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2227                .expect("Unable to write file");
2228        writer.write(&batch).unwrap();
2229        writer.close().unwrap();
2230
2231        let options = ReadOptionsBuilder::new().with_page_index().build();
2232        let reader =
2233            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2234
2235        let column = reader.metadata().row_group(0).columns();
2236
2237        assert_eq!(column.len(), 1);
2238
2239        // We should write one row before falling back to PLAIN encoding so there should still be a
2240        // dictionary page.
2241        assert!(
2242            column[0].dictionary_page_offset().is_some(),
2243            "Expected a dictionary page"
2244        );
2245
2246        assert!(reader.metadata().offset_index().is_some());
2247        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2248
2249        let page_locations = offset_indexes[0].page_locations.clone();
2250
2251        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2252        // so we expect one dictionary encoded page and then a page per row thereafter.
2253        assert_eq!(
2254            page_locations.len(),
2255            10,
2256            "Expected 10 pages but got {page_locations:#?}"
2257        );
2258    }
2259
2260    #[test]
2261    fn arrow_writer_float_nans() {
2262        let f16_field = Field::new("a", DataType::Float16, false);
2263        let f32_field = Field::new("b", DataType::Float32, false);
2264        let f64_field = Field::new("c", DataType::Float64, false);
2265        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2266
2267        let f16_values = (0..MEDIUM_SIZE)
2268            .map(|i| {
2269                Some(if i % 2 == 0 {
2270                    f16::NAN
2271                } else {
2272                    f16::from_f32(i as f32)
2273                })
2274            })
2275            .collect::<Float16Array>();
2276
2277        let f32_values = (0..MEDIUM_SIZE)
2278            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2279            .collect::<Float32Array>();
2280
2281        let f64_values = (0..MEDIUM_SIZE)
2282            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2283            .collect::<Float64Array>();
2284
2285        let batch = RecordBatch::try_new(
2286            Arc::new(schema),
2287            vec![
2288                Arc::new(f16_values),
2289                Arc::new(f32_values),
2290                Arc::new(f64_values),
2291            ],
2292        )
2293        .unwrap();
2294
2295        roundtrip(batch, None);
2296    }
2297
2298    const SMALL_SIZE: usize = 7;
2299    const MEDIUM_SIZE: usize = 63;
2300
2301    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2302        let mut files = vec![];
2303        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2304            let mut props = WriterProperties::builder().set_writer_version(version);
2305
2306            if let Some(size) = max_row_group_size {
2307                props = props.set_max_row_group_size(size)
2308            }
2309
2310            let props = props.build();
2311            files.push(roundtrip_opts(&expected_batch, props))
2312        }
2313        files
2314    }
2315
2316    // Round trip the specified record batch with the specified writer properties,
2317    // to an in-memory file, and validate the arrays using the specified function.
2318    // Returns the in-memory file.
2319    fn roundtrip_opts_with_array_validation<F>(
2320        expected_batch: &RecordBatch,
2321        props: WriterProperties,
2322        validate: F,
2323    ) -> Bytes
2324    where
2325        F: Fn(&ArrayData, &ArrayData),
2326    {
2327        let mut file = vec![];
2328
2329        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2330            .expect("Unable to write file");
2331        writer.write(expected_batch).unwrap();
2332        writer.close().unwrap();
2333
2334        let file = Bytes::from(file);
2335        let mut record_batch_reader =
2336            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2337
2338        let actual_batch = record_batch_reader
2339            .next()
2340            .expect("No batch found")
2341            .expect("Unable to get batch");
2342
2343        assert_eq!(expected_batch.schema(), actual_batch.schema());
2344        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2345        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2346        for i in 0..expected_batch.num_columns() {
2347            let expected_data = expected_batch.column(i).to_data();
2348            let actual_data = actual_batch.column(i).to_data();
2349            validate(&expected_data, &actual_data);
2350        }
2351
2352        file
2353    }
2354
2355    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2356        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2357            a.validate_full().expect("valid expected data");
2358            b.validate_full().expect("valid actual data");
2359            assert_eq!(a, b)
2360        })
2361    }
2362
2363    struct RoundTripOptions {
2364        values: ArrayRef,
2365        schema: SchemaRef,
2366        bloom_filter: bool,
2367        bloom_filter_position: BloomFilterPosition,
2368    }
2369
2370    impl RoundTripOptions {
2371        fn new(values: ArrayRef, nullable: bool) -> Self {
2372            let data_type = values.data_type().clone();
2373            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2374            Self {
2375                values,
2376                schema: Arc::new(schema),
2377                bloom_filter: false,
2378                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2379            }
2380        }
2381    }
2382
2383    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2384        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2385    }
2386
2387    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2388        let mut options = RoundTripOptions::new(values, false);
2389        options.schema = schema;
2390        one_column_roundtrip_with_options(options)
2391    }
2392
2393    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2394        let RoundTripOptions {
2395            values,
2396            schema,
2397            bloom_filter,
2398            bloom_filter_position,
2399        } = options;
2400
2401        let encodings = match values.data_type() {
2402            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2403                vec![
2404                    Encoding::PLAIN,
2405                    Encoding::DELTA_BYTE_ARRAY,
2406                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2407                ]
2408            }
2409            DataType::Int64
2410            | DataType::Int32
2411            | DataType::Int16
2412            | DataType::Int8
2413            | DataType::UInt64
2414            | DataType::UInt32
2415            | DataType::UInt16
2416            | DataType::UInt8 => vec![
2417                Encoding::PLAIN,
2418                Encoding::DELTA_BINARY_PACKED,
2419                Encoding::BYTE_STREAM_SPLIT,
2420            ],
2421            DataType::Float32 | DataType::Float64 => {
2422                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2423            }
2424            _ => vec![Encoding::PLAIN],
2425        };
2426
2427        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2428
2429        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2430
2431        let mut files = vec![];
2432        for dictionary_size in [0, 1, 1024] {
2433            for encoding in &encodings {
2434                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2435                    for row_group_size in row_group_sizes {
2436                        let props = WriterProperties::builder()
2437                            .set_writer_version(version)
2438                            .set_max_row_group_size(row_group_size)
2439                            .set_dictionary_enabled(dictionary_size != 0)
2440                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2441                            .set_encoding(*encoding)
2442                            .set_bloom_filter_enabled(bloom_filter)
2443                            .set_bloom_filter_position(bloom_filter_position)
2444                            .build();
2445
2446                        files.push(roundtrip_opts(&expected_batch, props))
2447                    }
2448                }
2449            }
2450        }
2451        files
2452    }
2453
2454    fn values_required<A, I>(iter: I) -> Vec<Bytes>
2455    where
2456        A: From<Vec<I::Item>> + Array + 'static,
2457        I: IntoIterator,
2458    {
2459        let raw_values: Vec<_> = iter.into_iter().collect();
2460        let values = Arc::new(A::from(raw_values));
2461        one_column_roundtrip(values, false)
2462    }
2463
2464    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2465    where
2466        A: From<Vec<Option<I::Item>>> + Array + 'static,
2467        I: IntoIterator,
2468    {
2469        let optional_raw_values: Vec<_> = iter
2470            .into_iter()
2471            .enumerate()
2472            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2473            .collect();
2474        let optional_values = Arc::new(A::from(optional_raw_values));
2475        one_column_roundtrip(optional_values, true)
2476    }
2477
2478    fn required_and_optional<A, I>(iter: I)
2479    where
2480        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2481        I: IntoIterator + Clone,
2482    {
2483        values_required::<A, I>(iter.clone());
2484        values_optional::<A, I>(iter);
2485    }
2486
2487    fn check_bloom_filter<T: AsBytes>(
2488        files: Vec<Bytes>,
2489        file_column: String,
2490        positive_values: Vec<T>,
2491        negative_values: Vec<T>,
2492    ) {
2493        files.into_iter().take(1).for_each(|file| {
2494            let file_reader = SerializedFileReader::new_with_options(
2495                file,
2496                ReadOptionsBuilder::new()
2497                    .with_reader_properties(
2498                        ReaderProperties::builder()
2499                            .set_read_bloom_filter(true)
2500                            .build(),
2501                    )
2502                    .build(),
2503            )
2504            .expect("Unable to open file as Parquet");
2505            let metadata = file_reader.metadata();
2506
2507            // Gets bloom filters from all row groups.
2508            let mut bloom_filters: Vec<_> = vec![];
2509            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2510                if let Some((column_index, _)) = row_group
2511                    .columns()
2512                    .iter()
2513                    .enumerate()
2514                    .find(|(_, column)| column.column_path().string() == file_column)
2515                {
2516                    let row_group_reader = file_reader
2517                        .get_row_group(ri)
2518                        .expect("Unable to read row group");
2519                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2520                        bloom_filters.push(sbbf.clone());
2521                    } else {
2522                        panic!("No bloom filter for column named {file_column} found");
2523                    }
2524                } else {
2525                    panic!("No column named {file_column} found");
2526                }
2527            }
2528
2529            positive_values.iter().for_each(|value| {
2530                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2531                assert!(
2532                    found.is_some(),
2533                    "{}",
2534                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2535                );
2536            });
2537
2538            negative_values.iter().for_each(|value| {
2539                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2540                assert!(
2541                    found.is_none(),
2542                    "{}",
2543                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2544                );
2545            });
2546        });
2547    }
2548
2549    #[test]
2550    fn all_null_primitive_single_column() {
2551        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2552        one_column_roundtrip(values, true);
2553    }
2554    #[test]
2555    fn null_single_column() {
2556        let values = Arc::new(NullArray::new(SMALL_SIZE));
2557        one_column_roundtrip(values, true);
2558        // null arrays are always nullable, a test with non-nullable nulls fails
2559    }
2560
2561    #[test]
2562    fn bool_single_column() {
2563        required_and_optional::<BooleanArray, _>(
2564            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2565        );
2566    }
2567
2568    #[test]
2569    fn bool_large_single_column() {
2570        let values = Arc::new(
2571            [None, Some(true), Some(false)]
2572                .iter()
2573                .cycle()
2574                .copied()
2575                .take(200_000)
2576                .collect::<BooleanArray>(),
2577        );
2578        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2579        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2580        let file = tempfile::tempfile().unwrap();
2581
2582        let mut writer =
2583            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2584                .expect("Unable to write file");
2585        writer.write(&expected_batch).unwrap();
2586        writer.close().unwrap();
2587    }
2588
2589    #[test]
2590    fn check_page_offset_index_with_nan() {
2591        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2592        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2593        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2594
2595        let mut out = Vec::with_capacity(1024);
2596        let mut writer =
2597            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2598        writer.write(&batch).unwrap();
2599        let file_meta_data = writer.close().unwrap();
2600        for row_group in file_meta_data.row_groups() {
2601            for column in row_group.columns() {
2602                assert!(column.offset_index_offset().is_some());
2603                assert!(column.offset_index_length().is_some());
2604                assert!(column.column_index_offset().is_none());
2605                assert!(column.column_index_length().is_none());
2606            }
2607        }
2608    }
2609
2610    #[test]
2611    fn i8_single_column() {
2612        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2613    }
2614
2615    #[test]
2616    fn i16_single_column() {
2617        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2618    }
2619
2620    #[test]
2621    fn i32_single_column() {
2622        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2623    }
2624
2625    #[test]
2626    fn i64_single_column() {
2627        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2628    }
2629
2630    #[test]
2631    fn u8_single_column() {
2632        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2633    }
2634
2635    #[test]
2636    fn u16_single_column() {
2637        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2638    }
2639
2640    #[test]
2641    fn u32_single_column() {
2642        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2643    }
2644
2645    #[test]
2646    fn u64_single_column() {
2647        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2648    }
2649
2650    #[test]
2651    fn f32_single_column() {
2652        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2653    }
2654
2655    #[test]
2656    fn f64_single_column() {
2657        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2658    }
2659
2660    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2661    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2662    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2663
2664    #[test]
2665    fn timestamp_second_single_column() {
2666        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2667        let values = Arc::new(TimestampSecondArray::from(raw_values));
2668
2669        one_column_roundtrip(values, false);
2670    }
2671
2672    #[test]
2673    fn timestamp_millisecond_single_column() {
2674        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2675        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2676
2677        one_column_roundtrip(values, false);
2678    }
2679
2680    #[test]
2681    fn timestamp_microsecond_single_column() {
2682        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2683        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2684
2685        one_column_roundtrip(values, false);
2686    }
2687
2688    #[test]
2689    fn timestamp_nanosecond_single_column() {
2690        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2691        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2692
2693        one_column_roundtrip(values, false);
2694    }
2695
2696    #[test]
2697    fn date32_single_column() {
2698        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2699    }
2700
2701    #[test]
2702    fn date64_single_column() {
2703        // Date64 must be a multiple of 86400000, see ARROW-10925
2704        required_and_optional::<Date64Array, _>(
2705            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2706        );
2707    }
2708
2709    #[test]
2710    fn time32_second_single_column() {
2711        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2712    }
2713
2714    #[test]
2715    fn time32_millisecond_single_column() {
2716        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2717    }
2718
2719    #[test]
2720    fn time64_microsecond_single_column() {
2721        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2722    }
2723
2724    #[test]
2725    fn time64_nanosecond_single_column() {
2726        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2727    }
2728
2729    #[test]
2730    fn duration_second_single_column() {
2731        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2732    }
2733
2734    #[test]
2735    fn duration_millisecond_single_column() {
2736        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2737    }
2738
2739    #[test]
2740    fn duration_microsecond_single_column() {
2741        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2742    }
2743
2744    #[test]
2745    fn duration_nanosecond_single_column() {
2746        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2747    }
2748
2749    #[test]
2750    fn interval_year_month_single_column() {
2751        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2752    }
2753
2754    #[test]
2755    fn interval_day_time_single_column() {
2756        required_and_optional::<IntervalDayTimeArray, _>(vec![
2757            IntervalDayTime::new(0, 1),
2758            IntervalDayTime::new(0, 3),
2759            IntervalDayTime::new(3, -2),
2760            IntervalDayTime::new(-200, 4),
2761        ]);
2762    }
2763
2764    #[test]
2765    #[should_panic(
2766        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2767    )]
2768    fn interval_month_day_nano_single_column() {
2769        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2770            IntervalMonthDayNano::new(0, 1, 5),
2771            IntervalMonthDayNano::new(0, 3, 2),
2772            IntervalMonthDayNano::new(3, -2, -5),
2773            IntervalMonthDayNano::new(-200, 4, -1),
2774        ]);
2775    }
2776
2777    #[test]
2778    fn binary_single_column() {
2779        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2780        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2781        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2782
2783        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2784        values_required::<BinaryArray, _>(many_vecs_iter);
2785    }
2786
2787    #[test]
2788    fn binary_view_single_column() {
2789        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2790        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2791        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2792
2793        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2794        values_required::<BinaryViewArray, _>(many_vecs_iter);
2795    }
2796
2797    #[test]
2798    fn i32_column_bloom_filter_at_end() {
2799        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2800        let mut options = RoundTripOptions::new(array, false);
2801        options.bloom_filter = true;
2802        options.bloom_filter_position = BloomFilterPosition::End;
2803
2804        let files = one_column_roundtrip_with_options(options);
2805        check_bloom_filter(
2806            files,
2807            "col".to_string(),
2808            (0..SMALL_SIZE as i32).collect(),
2809            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2810        );
2811    }
2812
2813    #[test]
2814    fn i32_column_bloom_filter() {
2815        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2816        let mut options = RoundTripOptions::new(array, false);
2817        options.bloom_filter = true;
2818
2819        let files = one_column_roundtrip_with_options(options);
2820        check_bloom_filter(
2821            files,
2822            "col".to_string(),
2823            (0..SMALL_SIZE as i32).collect(),
2824            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2825        );
2826    }
2827
2828    #[test]
2829    fn binary_column_bloom_filter() {
2830        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2831        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2832        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2833
2834        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2835        let mut options = RoundTripOptions::new(array, false);
2836        options.bloom_filter = true;
2837
2838        let files = one_column_roundtrip_with_options(options);
2839        check_bloom_filter(
2840            files,
2841            "col".to_string(),
2842            many_vecs,
2843            vec![vec![(SMALL_SIZE + 1) as u8]],
2844        );
2845    }
2846
2847    #[test]
2848    fn empty_string_null_column_bloom_filter() {
2849        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2850        let raw_strs = raw_values.iter().map(|s| s.as_str());
2851
2852        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2853        let mut options = RoundTripOptions::new(array, false);
2854        options.bloom_filter = true;
2855
2856        let files = one_column_roundtrip_with_options(options);
2857
2858        let optional_raw_values: Vec<_> = raw_values
2859            .iter()
2860            .enumerate()
2861            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2862            .collect();
2863        // For null slots, empty string should not be in bloom filter.
2864        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2865    }
2866
2867    #[test]
2868    fn large_binary_single_column() {
2869        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2870        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2871        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2872
2873        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2874        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2875    }
2876
2877    #[test]
2878    fn fixed_size_binary_single_column() {
2879        let mut builder = FixedSizeBinaryBuilder::new(4);
2880        builder.append_value(b"0123").unwrap();
2881        builder.append_null();
2882        builder.append_value(b"8910").unwrap();
2883        builder.append_value(b"1112").unwrap();
2884        let array = Arc::new(builder.finish());
2885
2886        one_column_roundtrip(array, true);
2887    }
2888
2889    #[test]
2890    fn string_single_column() {
2891        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2892        let raw_strs = raw_values.iter().map(|s| s.as_str());
2893
2894        required_and_optional::<StringArray, _>(raw_strs);
2895    }
2896
2897    #[test]
2898    fn large_string_single_column() {
2899        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2900        let raw_strs = raw_values.iter().map(|s| s.as_str());
2901
2902        required_and_optional::<LargeStringArray, _>(raw_strs);
2903    }
2904
2905    #[test]
2906    fn string_view_single_column() {
2907        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2908        let raw_strs = raw_values.iter().map(|s| s.as_str());
2909
2910        required_and_optional::<StringViewArray, _>(raw_strs);
2911    }
2912
2913    #[test]
2914    fn null_list_single_column() {
2915        let null_field = Field::new_list_field(DataType::Null, true);
2916        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2917
2918        let schema = Schema::new(vec![list_field]);
2919
2920        // Build [[], null, [null, null]]
2921        let a_values = NullArray::new(2);
2922        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2923        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2924            DataType::Null,
2925            true,
2926        ))))
2927        .len(3)
2928        .add_buffer(a_value_offsets)
2929        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2930        .add_child_data(a_values.into_data())
2931        .build()
2932        .unwrap();
2933
2934        let a = ListArray::from(a_list_data);
2935
2936        assert!(a.is_valid(0));
2937        assert!(!a.is_valid(1));
2938        assert!(a.is_valid(2));
2939
2940        assert_eq!(a.value(0).len(), 0);
2941        assert_eq!(a.value(2).len(), 2);
2942        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2943
2944        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2945        roundtrip(batch, None);
2946    }
2947
2948    #[test]
2949    fn list_single_column() {
2950        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2951        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2952        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2953            DataType::Int32,
2954            false,
2955        ))))
2956        .len(5)
2957        .add_buffer(a_value_offsets)
2958        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2959        .add_child_data(a_values.into_data())
2960        .build()
2961        .unwrap();
2962
2963        assert_eq!(a_list_data.null_count(), 1);
2964
2965        let a = ListArray::from(a_list_data);
2966        let values = Arc::new(a);
2967
2968        one_column_roundtrip(values, true);
2969    }
2970
2971    #[test]
2972    fn large_list_single_column() {
2973        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2974        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2975        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2976            "large_item",
2977            DataType::Int32,
2978            true,
2979        ))))
2980        .len(5)
2981        .add_buffer(a_value_offsets)
2982        .add_child_data(a_values.into_data())
2983        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2984        .build()
2985        .unwrap();
2986
2987        // I think this setup is incorrect because this should pass
2988        assert_eq!(a_list_data.null_count(), 1);
2989
2990        let a = LargeListArray::from(a_list_data);
2991        let values = Arc::new(a);
2992
2993        one_column_roundtrip(values, true);
2994    }
2995
2996    #[test]
2997    fn list_nested_nulls() {
2998        use arrow::datatypes::Int32Type;
2999        let data = vec![
3000            Some(vec![Some(1)]),
3001            Some(vec![Some(2), Some(3)]),
3002            None,
3003            Some(vec![Some(4), Some(5), None]),
3004            Some(vec![None]),
3005            Some(vec![Some(6), Some(7)]),
3006        ];
3007
3008        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3009        one_column_roundtrip(Arc::new(list), true);
3010
3011        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3012        one_column_roundtrip(Arc::new(list), true);
3013    }
3014
3015    #[test]
3016    fn struct_single_column() {
3017        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3018        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3019        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3020
3021        let values = Arc::new(s);
3022        one_column_roundtrip(values, false);
3023    }
3024
3025    #[test]
3026    fn list_and_map_coerced_names() {
3027        // Create map and list with non-Parquet naming
3028        let list_field =
3029            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3030        let map_field = Field::new_map(
3031            "my_map",
3032            "entries",
3033            Field::new("keys", DataType::Int32, false),
3034            Field::new("values", DataType::Int32, true),
3035            false,
3036            true,
3037        );
3038
3039        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3040        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3041
3042        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3043
3044        // Write data to Parquet but coerce names to match spec
3045        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3046        let file = tempfile::tempfile().unwrap();
3047        let mut writer =
3048            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3049
3050        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3051        writer.write(&batch).unwrap();
3052        let file_metadata = writer.close().unwrap();
3053
3054        let schema = file_metadata.file_metadata().schema();
3055        // Coerced name of "item" should be "element"
3056        let list_field = &schema.get_fields()[0].get_fields()[0];
3057        assert_eq!(list_field.get_fields()[0].name(), "element");
3058
3059        let map_field = &schema.get_fields()[1].get_fields()[0];
3060        // Coerced name of "entries" should be "key_value"
3061        assert_eq!(map_field.name(), "key_value");
3062        // Coerced name of "keys" should be "key"
3063        assert_eq!(map_field.get_fields()[0].name(), "key");
3064        // Coerced name of "values" should be "value"
3065        assert_eq!(map_field.get_fields()[1].name(), "value");
3066
3067        // Double check schema after reading from the file
3068        let reader = SerializedFileReader::new(file).unwrap();
3069        let file_schema = reader.metadata().file_metadata().schema();
3070        let fields = file_schema.get_fields();
3071        let list_field = &fields[0].get_fields()[0];
3072        assert_eq!(list_field.get_fields()[0].name(), "element");
3073        let map_field = &fields[1].get_fields()[0];
3074        assert_eq!(map_field.name(), "key_value");
3075        assert_eq!(map_field.get_fields()[0].name(), "key");
3076        assert_eq!(map_field.get_fields()[1].name(), "value");
3077    }
3078
3079    #[test]
3080    fn fallback_flush_data_page() {
3081        //tests if the Fallback::flush_data_page clears all buffers correctly
3082        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3083        let values = Arc::new(StringArray::from(raw_values));
3084        let encodings = vec![
3085            Encoding::DELTA_BYTE_ARRAY,
3086            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3087        ];
3088        let data_type = values.data_type().clone();
3089        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3090        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3091
3092        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3093        let data_page_size_limit: usize = 32;
3094        let write_batch_size: usize = 16;
3095
3096        for encoding in &encodings {
3097            for row_group_size in row_group_sizes {
3098                let props = WriterProperties::builder()
3099                    .set_writer_version(WriterVersion::PARQUET_2_0)
3100                    .set_max_row_group_size(row_group_size)
3101                    .set_dictionary_enabled(false)
3102                    .set_encoding(*encoding)
3103                    .set_data_page_size_limit(data_page_size_limit)
3104                    .set_write_batch_size(write_batch_size)
3105                    .build();
3106
3107                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3108                    let string_array_a = StringArray::from(a.clone());
3109                    let string_array_b = StringArray::from(b.clone());
3110                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3111                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3112                    assert_eq!(
3113                        vec_a, vec_b,
3114                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3115                    );
3116                });
3117            }
3118        }
3119    }
3120
3121    #[test]
3122    fn arrow_writer_string_dictionary() {
3123        // define schema
3124        #[allow(deprecated)]
3125        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3126            "dictionary",
3127            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3128            true,
3129            42,
3130            true,
3131        )]));
3132
3133        // create some data
3134        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3135            .iter()
3136            .copied()
3137            .collect();
3138
3139        // build a record batch
3140        one_column_roundtrip_with_schema(Arc::new(d), schema);
3141    }
3142
3143    #[test]
3144    fn arrow_writer_test_type_compatibility() {
3145        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3146        where
3147            T1: Array + 'static,
3148            T2: Array + 'static,
3149        {
3150            let schema1 = Arc::new(Schema::new(vec![Field::new(
3151                "a",
3152                array1.data_type().clone(),
3153                false,
3154            )]));
3155
3156            let file = tempfile().unwrap();
3157            let mut writer =
3158                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3159
3160            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3161            writer.write(&rb1).unwrap();
3162
3163            let schema2 = Arc::new(Schema::new(vec![Field::new(
3164                "a",
3165                array2.data_type().clone(),
3166                false,
3167            )]));
3168            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3169            writer.write(&rb2).unwrap();
3170
3171            writer.close().unwrap();
3172
3173            let mut record_batch_reader =
3174                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3175            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3176
3177            let expected_batch =
3178                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3179            assert_eq!(actual_batch, expected_batch);
3180        }
3181
3182        // check compatibility between native and dictionaries
3183
3184        ensure_compatible_write(
3185            DictionaryArray::new(
3186                UInt8Array::from_iter_values(vec![0]),
3187                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3188            ),
3189            StringArray::from_iter_values(vec!["barquet"]),
3190            DictionaryArray::new(
3191                UInt8Array::from_iter_values(vec![0, 1]),
3192                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3193            ),
3194        );
3195
3196        ensure_compatible_write(
3197            StringArray::from_iter_values(vec!["parquet"]),
3198            DictionaryArray::new(
3199                UInt8Array::from_iter_values(vec![0]),
3200                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3201            ),
3202            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3203        );
3204
3205        // check compatibility between dictionaries with different key types
3206
3207        ensure_compatible_write(
3208            DictionaryArray::new(
3209                UInt8Array::from_iter_values(vec![0]),
3210                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3211            ),
3212            DictionaryArray::new(
3213                UInt16Array::from_iter_values(vec![0]),
3214                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3215            ),
3216            DictionaryArray::new(
3217                UInt8Array::from_iter_values(vec![0, 1]),
3218                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3219            ),
3220        );
3221
3222        // check compatibility between dictionaries with different value types
3223        ensure_compatible_write(
3224            DictionaryArray::new(
3225                UInt8Array::from_iter_values(vec![0]),
3226                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3227            ),
3228            DictionaryArray::new(
3229                UInt8Array::from_iter_values(vec![0]),
3230                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3231            ),
3232            DictionaryArray::new(
3233                UInt8Array::from_iter_values(vec![0, 1]),
3234                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3235            ),
3236        );
3237
3238        // check compatibility between a dictionary and a native array with a different type
3239        ensure_compatible_write(
3240            DictionaryArray::new(
3241                UInt8Array::from_iter_values(vec![0]),
3242                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3243            ),
3244            LargeStringArray::from_iter_values(vec!["barquet"]),
3245            DictionaryArray::new(
3246                UInt8Array::from_iter_values(vec![0, 1]),
3247                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3248            ),
3249        );
3250
3251        // check compatibility for string types
3252
3253        ensure_compatible_write(
3254            StringArray::from_iter_values(vec!["parquet"]),
3255            LargeStringArray::from_iter_values(vec!["barquet"]),
3256            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3257        );
3258
3259        ensure_compatible_write(
3260            LargeStringArray::from_iter_values(vec!["parquet"]),
3261            StringArray::from_iter_values(vec!["barquet"]),
3262            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3263        );
3264
3265        ensure_compatible_write(
3266            StringArray::from_iter_values(vec!["parquet"]),
3267            StringViewArray::from_iter_values(vec!["barquet"]),
3268            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3269        );
3270
3271        ensure_compatible_write(
3272            StringViewArray::from_iter_values(vec!["parquet"]),
3273            StringArray::from_iter_values(vec!["barquet"]),
3274            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3275        );
3276
3277        ensure_compatible_write(
3278            LargeStringArray::from_iter_values(vec!["parquet"]),
3279            StringViewArray::from_iter_values(vec!["barquet"]),
3280            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3281        );
3282
3283        ensure_compatible_write(
3284            StringViewArray::from_iter_values(vec!["parquet"]),
3285            LargeStringArray::from_iter_values(vec!["barquet"]),
3286            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3287        );
3288
3289        // check compatibility for binary types
3290
3291        ensure_compatible_write(
3292            BinaryArray::from_iter_values(vec![b"parquet"]),
3293            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3294            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3295        );
3296
3297        ensure_compatible_write(
3298            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3299            BinaryArray::from_iter_values(vec![b"barquet"]),
3300            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3301        );
3302
3303        ensure_compatible_write(
3304            BinaryArray::from_iter_values(vec![b"parquet"]),
3305            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3306            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3307        );
3308
3309        ensure_compatible_write(
3310            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3311            BinaryArray::from_iter_values(vec![b"barquet"]),
3312            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3313        );
3314
3315        ensure_compatible_write(
3316            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3317            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3318            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3319        );
3320
3321        ensure_compatible_write(
3322            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3323            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3324            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3325        );
3326    }
3327
3328    #[test]
3329    fn arrow_writer_primitive_dictionary() {
3330        // define schema
3331        #[allow(deprecated)]
3332        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3333            "dictionary",
3334            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3335            true,
3336            42,
3337            true,
3338        )]));
3339
3340        // create some data
3341        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3342        builder.append(12345678).unwrap();
3343        builder.append_null();
3344        builder.append(22345678).unwrap();
3345        builder.append(12345678).unwrap();
3346        let d = builder.finish();
3347
3348        one_column_roundtrip_with_schema(Arc::new(d), schema);
3349    }
3350
3351    #[test]
3352    fn arrow_writer_decimal32_dictionary() {
3353        let integers = vec![12345, 56789, 34567];
3354
3355        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3356
3357        let values = Decimal32Array::from(integers.clone())
3358            .with_precision_and_scale(5, 2)
3359            .unwrap();
3360
3361        let array = DictionaryArray::new(keys, Arc::new(values));
3362        one_column_roundtrip(Arc::new(array.clone()), true);
3363
3364        let values = Decimal32Array::from(integers)
3365            .with_precision_and_scale(9, 2)
3366            .unwrap();
3367
3368        let array = array.with_values(Arc::new(values));
3369        one_column_roundtrip(Arc::new(array), true);
3370    }
3371
3372    #[test]
3373    fn arrow_writer_decimal64_dictionary() {
3374        let integers = vec![12345, 56789, 34567];
3375
3376        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3377
3378        let values = Decimal64Array::from(integers.clone())
3379            .with_precision_and_scale(5, 2)
3380            .unwrap();
3381
3382        let array = DictionaryArray::new(keys, Arc::new(values));
3383        one_column_roundtrip(Arc::new(array.clone()), true);
3384
3385        let values = Decimal64Array::from(integers)
3386            .with_precision_and_scale(12, 2)
3387            .unwrap();
3388
3389        let array = array.with_values(Arc::new(values));
3390        one_column_roundtrip(Arc::new(array), true);
3391    }
3392
3393    #[test]
3394    fn arrow_writer_decimal128_dictionary() {
3395        let integers = vec![12345, 56789, 34567];
3396
3397        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3398
3399        let values = Decimal128Array::from(integers.clone())
3400            .with_precision_and_scale(5, 2)
3401            .unwrap();
3402
3403        let array = DictionaryArray::new(keys, Arc::new(values));
3404        one_column_roundtrip(Arc::new(array.clone()), true);
3405
3406        let values = Decimal128Array::from(integers)
3407            .with_precision_and_scale(12, 2)
3408            .unwrap();
3409
3410        let array = array.with_values(Arc::new(values));
3411        one_column_roundtrip(Arc::new(array), true);
3412    }
3413
3414    #[test]
3415    fn arrow_writer_decimal256_dictionary() {
3416        let integers = vec![
3417            i256::from_i128(12345),
3418            i256::from_i128(56789),
3419            i256::from_i128(34567),
3420        ];
3421
3422        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3423
3424        let values = Decimal256Array::from(integers.clone())
3425            .with_precision_and_scale(5, 2)
3426            .unwrap();
3427
3428        let array = DictionaryArray::new(keys, Arc::new(values));
3429        one_column_roundtrip(Arc::new(array.clone()), true);
3430
3431        let values = Decimal256Array::from(integers)
3432            .with_precision_and_scale(12, 2)
3433            .unwrap();
3434
3435        let array = array.with_values(Arc::new(values));
3436        one_column_roundtrip(Arc::new(array), true);
3437    }
3438
3439    #[test]
3440    fn arrow_writer_string_dictionary_unsigned_index() {
3441        // define schema
3442        #[allow(deprecated)]
3443        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3444            "dictionary",
3445            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3446            true,
3447            42,
3448            true,
3449        )]));
3450
3451        // create some data
3452        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3453            .iter()
3454            .copied()
3455            .collect();
3456
3457        one_column_roundtrip_with_schema(Arc::new(d), schema);
3458    }
3459
3460    #[test]
3461    fn u32_min_max() {
3462        // check values roundtrip through parquet
3463        let src = [
3464            u32::MIN,
3465            u32::MIN + 1,
3466            (i32::MAX as u32) - 1,
3467            i32::MAX as u32,
3468            (i32::MAX as u32) + 1,
3469            u32::MAX - 1,
3470            u32::MAX,
3471        ];
3472        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3473        let files = one_column_roundtrip(values, false);
3474
3475        for file in files {
3476            // check statistics are valid
3477            let reader = SerializedFileReader::new(file).unwrap();
3478            let metadata = reader.metadata();
3479
3480            let mut row_offset = 0;
3481            for row_group in metadata.row_groups() {
3482                assert_eq!(row_group.num_columns(), 1);
3483                let column = row_group.column(0);
3484
3485                let num_values = column.num_values() as usize;
3486                let src_slice = &src[row_offset..row_offset + num_values];
3487                row_offset += column.num_values() as usize;
3488
3489                let stats = column.statistics().unwrap();
3490                if let Statistics::Int32(stats) = stats {
3491                    assert_eq!(
3492                        *stats.min_opt().unwrap() as u32,
3493                        *src_slice.iter().min().unwrap()
3494                    );
3495                    assert_eq!(
3496                        *stats.max_opt().unwrap() as u32,
3497                        *src_slice.iter().max().unwrap()
3498                    );
3499                } else {
3500                    panic!("Statistics::Int32 missing")
3501                }
3502            }
3503        }
3504    }
3505
3506    #[test]
3507    fn u64_min_max() {
3508        // check values roundtrip through parquet
3509        let src = [
3510            u64::MIN,
3511            u64::MIN + 1,
3512            (i64::MAX as u64) - 1,
3513            i64::MAX as u64,
3514            (i64::MAX as u64) + 1,
3515            u64::MAX - 1,
3516            u64::MAX,
3517        ];
3518        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3519        let files = one_column_roundtrip(values, false);
3520
3521        for file in files {
3522            // check statistics are valid
3523            let reader = SerializedFileReader::new(file).unwrap();
3524            let metadata = reader.metadata();
3525
3526            let mut row_offset = 0;
3527            for row_group in metadata.row_groups() {
3528                assert_eq!(row_group.num_columns(), 1);
3529                let column = row_group.column(0);
3530
3531                let num_values = column.num_values() as usize;
3532                let src_slice = &src[row_offset..row_offset + num_values];
3533                row_offset += column.num_values() as usize;
3534
3535                let stats = column.statistics().unwrap();
3536                if let Statistics::Int64(stats) = stats {
3537                    assert_eq!(
3538                        *stats.min_opt().unwrap() as u64,
3539                        *src_slice.iter().min().unwrap()
3540                    );
3541                    assert_eq!(
3542                        *stats.max_opt().unwrap() as u64,
3543                        *src_slice.iter().max().unwrap()
3544                    );
3545                } else {
3546                    panic!("Statistics::Int64 missing")
3547                }
3548            }
3549        }
3550    }
3551
3552    #[test]
3553    fn statistics_null_counts_only_nulls() {
3554        // check that null-count statistics for "only NULL"-columns are correct
3555        let values = Arc::new(UInt64Array::from(vec![None, None]));
3556        let files = one_column_roundtrip(values, true);
3557
3558        for file in files {
3559            // check statistics are valid
3560            let reader = SerializedFileReader::new(file).unwrap();
3561            let metadata = reader.metadata();
3562            assert_eq!(metadata.num_row_groups(), 1);
3563            let row_group = metadata.row_group(0);
3564            assert_eq!(row_group.num_columns(), 1);
3565            let column = row_group.column(0);
3566            let stats = column.statistics().unwrap();
3567            assert_eq!(stats.null_count_opt(), Some(2));
3568        }
3569    }
3570
3571    #[test]
3572    fn test_list_of_struct_roundtrip() {
3573        // define schema
3574        let int_field = Field::new("a", DataType::Int32, true);
3575        let int_field2 = Field::new("b", DataType::Int32, true);
3576
3577        let int_builder = Int32Builder::with_capacity(10);
3578        let int_builder2 = Int32Builder::with_capacity(10);
3579
3580        let struct_builder = StructBuilder::new(
3581            vec![int_field, int_field2],
3582            vec![Box::new(int_builder), Box::new(int_builder2)],
3583        );
3584        let mut list_builder = ListBuilder::new(struct_builder);
3585
3586        // Construct the following array
3587        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3588
3589        // [{a: 1, b: 2}]
3590        let values = list_builder.values();
3591        values
3592            .field_builder::<Int32Builder>(0)
3593            .unwrap()
3594            .append_value(1);
3595        values
3596            .field_builder::<Int32Builder>(1)
3597            .unwrap()
3598            .append_value(2);
3599        values.append(true);
3600        list_builder.append(true);
3601
3602        // []
3603        list_builder.append(true);
3604
3605        // null
3606        list_builder.append(false);
3607
3608        // [null, null]
3609        let values = list_builder.values();
3610        values
3611            .field_builder::<Int32Builder>(0)
3612            .unwrap()
3613            .append_null();
3614        values
3615            .field_builder::<Int32Builder>(1)
3616            .unwrap()
3617            .append_null();
3618        values.append(false);
3619        values
3620            .field_builder::<Int32Builder>(0)
3621            .unwrap()
3622            .append_null();
3623        values
3624            .field_builder::<Int32Builder>(1)
3625            .unwrap()
3626            .append_null();
3627        values.append(false);
3628        list_builder.append(true);
3629
3630        // [{a: null, b: 3}]
3631        let values = list_builder.values();
3632        values
3633            .field_builder::<Int32Builder>(0)
3634            .unwrap()
3635            .append_null();
3636        values
3637            .field_builder::<Int32Builder>(1)
3638            .unwrap()
3639            .append_value(3);
3640        values.append(true);
3641        list_builder.append(true);
3642
3643        // [{a: 2, b: null}]
3644        let values = list_builder.values();
3645        values
3646            .field_builder::<Int32Builder>(0)
3647            .unwrap()
3648            .append_value(2);
3649        values
3650            .field_builder::<Int32Builder>(1)
3651            .unwrap()
3652            .append_null();
3653        values.append(true);
3654        list_builder.append(true);
3655
3656        let array = Arc::new(list_builder.finish());
3657
3658        one_column_roundtrip(array, true);
3659    }
3660
3661    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3662        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3663    }
3664
3665    #[test]
3666    fn test_aggregates_records() {
3667        let arrays = [
3668            Int32Array::from((0..100).collect::<Vec<_>>()),
3669            Int32Array::from((0..50).collect::<Vec<_>>()),
3670            Int32Array::from((200..500).collect::<Vec<_>>()),
3671        ];
3672
3673        let schema = Arc::new(Schema::new(vec![Field::new(
3674            "int",
3675            ArrowDataType::Int32,
3676            false,
3677        )]));
3678
3679        let file = tempfile::tempfile().unwrap();
3680
3681        let props = WriterProperties::builder()
3682            .set_max_row_group_size(200)
3683            .build();
3684
3685        let mut writer =
3686            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3687
3688        for array in arrays {
3689            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3690            writer.write(&batch).unwrap();
3691        }
3692
3693        writer.close().unwrap();
3694
3695        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3696        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3697
3698        let batches = builder
3699            .with_batch_size(100)
3700            .build()
3701            .unwrap()
3702            .collect::<ArrowResult<Vec<_>>>()
3703            .unwrap();
3704
3705        assert_eq!(batches.len(), 5);
3706        assert!(batches.iter().all(|x| x.num_columns() == 1));
3707
3708        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3709
3710        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3711
3712        let values: Vec<_> = batches
3713            .iter()
3714            .flat_map(|x| {
3715                x.column(0)
3716                    .as_any()
3717                    .downcast_ref::<Int32Array>()
3718                    .unwrap()
3719                    .values()
3720                    .iter()
3721                    .cloned()
3722            })
3723            .collect();
3724
3725        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3726        assert_eq!(&values, &expected_values)
3727    }
3728
3729    #[test]
3730    fn complex_aggregate() {
3731        // Tests aggregating nested data
3732        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3733        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3734        let struct_a = Arc::new(Field::new(
3735            "struct_a",
3736            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3737            true,
3738        ));
3739
3740        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3741        let struct_b = Arc::new(Field::new(
3742            "struct_b",
3743            DataType::Struct(vec![list_a.clone()].into()),
3744            false,
3745        ));
3746
3747        let schema = Arc::new(Schema::new(vec![struct_b]));
3748
3749        // create nested data
3750        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3751        let field_b_array =
3752            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3753
3754        let struct_a_array = StructArray::from(vec![
3755            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3756            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3757        ]);
3758
3759        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3760            .len(5)
3761            .add_buffer(Buffer::from_iter(vec![
3762                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3763            ]))
3764            .null_bit_buffer(Some(Buffer::from_iter(vec![
3765                true, false, true, false, true,
3766            ])))
3767            .child_data(vec![struct_a_array.into_data()])
3768            .build()
3769            .unwrap();
3770
3771        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3772        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3773
3774        let batch1 =
3775            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3776                .unwrap();
3777
3778        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3779        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3780
3781        let struct_a_array = StructArray::from(vec![
3782            (field_a, Arc::new(field_a_array) as ArrayRef),
3783            (field_b, Arc::new(field_b_array) as ArrayRef),
3784        ]);
3785
3786        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3787            .len(2)
3788            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3789            .child_data(vec![struct_a_array.into_data()])
3790            .build()
3791            .unwrap();
3792
3793        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3794        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3795
3796        let batch2 =
3797            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3798                .unwrap();
3799
3800        let batches = &[batch1, batch2];
3801
3802        // Verify data is as expected
3803
3804        let expected = r#"
3805            +-------------------------------------------------------------------------------------------------------+
3806            | struct_b                                                                                              |
3807            +-------------------------------------------------------------------------------------------------------+
3808            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3809            | {list: }                                                                                              |
3810            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3811            | {list: }                                                                                              |
3812            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3813            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3814            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3815            +-------------------------------------------------------------------------------------------------------+
3816        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3817
3818        let actual = pretty_format_batches(batches).unwrap().to_string();
3819        assert_eq!(actual, expected);
3820
3821        // Write data
3822        let file = tempfile::tempfile().unwrap();
3823        let props = WriterProperties::builder()
3824            .set_max_row_group_size(6)
3825            .build();
3826
3827        let mut writer =
3828            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3829
3830        for batch in batches {
3831            writer.write(batch).unwrap();
3832        }
3833        writer.close().unwrap();
3834
3835        // Read Data
3836        // Should have written entire first batch and first row of second to the first row group
3837        // leaving a single row in the second row group
3838
3839        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3840        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3841
3842        let batches = builder
3843            .with_batch_size(2)
3844            .build()
3845            .unwrap()
3846            .collect::<ArrowResult<Vec<_>>>()
3847            .unwrap();
3848
3849        assert_eq!(batches.len(), 4);
3850        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3851        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3852
3853        let actual = pretty_format_batches(&batches).unwrap().to_string();
3854        assert_eq!(actual, expected);
3855    }
3856
3857    #[test]
3858    fn test_arrow_writer_metadata() {
3859        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3860        let file_schema = batch_schema.clone().with_metadata(
3861            vec![("foo".to_string(), "bar".to_string())]
3862                .into_iter()
3863                .collect(),
3864        );
3865
3866        let batch = RecordBatch::try_new(
3867            Arc::new(batch_schema),
3868            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3869        )
3870        .unwrap();
3871
3872        let mut buf = Vec::with_capacity(1024);
3873        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3874        writer.write(&batch).unwrap();
3875        writer.close().unwrap();
3876    }
3877
3878    #[test]
3879    fn test_arrow_writer_nullable() {
3880        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3881        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3882        let file_schema = Arc::new(file_schema);
3883
3884        let batch = RecordBatch::try_new(
3885            Arc::new(batch_schema),
3886            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3887        )
3888        .unwrap();
3889
3890        let mut buf = Vec::with_capacity(1024);
3891        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3892        writer.write(&batch).unwrap();
3893        writer.close().unwrap();
3894
3895        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3896        let back = read.next().unwrap().unwrap();
3897        assert_eq!(back.schema(), file_schema);
3898        assert_ne!(back.schema(), batch.schema());
3899        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3900    }
3901
3902    #[test]
3903    fn in_progress_accounting() {
3904        // define schema
3905        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3906
3907        // create some data
3908        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3909
3910        // build a record batch
3911        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3912
3913        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3914
3915        // starts empty
3916        assert_eq!(writer.in_progress_size(), 0);
3917        assert_eq!(writer.in_progress_rows(), 0);
3918        assert_eq!(writer.memory_size(), 0);
3919        assert_eq!(writer.bytes_written(), 4); // Initial header
3920        writer.write(&batch).unwrap();
3921
3922        // updated on write
3923        let initial_size = writer.in_progress_size();
3924        assert!(initial_size > 0);
3925        assert_eq!(writer.in_progress_rows(), 5);
3926        let initial_memory = writer.memory_size();
3927        assert!(initial_memory > 0);
3928        // memory estimate is larger than estimated encoded size
3929        assert!(
3930            initial_size <= initial_memory,
3931            "{initial_size} <= {initial_memory}"
3932        );
3933
3934        // updated on second write
3935        writer.write(&batch).unwrap();
3936        assert!(writer.in_progress_size() > initial_size);
3937        assert_eq!(writer.in_progress_rows(), 10);
3938        assert!(writer.memory_size() > initial_memory);
3939        assert!(
3940            writer.in_progress_size() <= writer.memory_size(),
3941            "in_progress_size {} <= memory_size {}",
3942            writer.in_progress_size(),
3943            writer.memory_size()
3944        );
3945
3946        // in progress tracking is cleared, but the overall data written is updated
3947        let pre_flush_bytes_written = writer.bytes_written();
3948        writer.flush().unwrap();
3949        assert_eq!(writer.in_progress_size(), 0);
3950        assert_eq!(writer.memory_size(), 0);
3951        assert!(writer.bytes_written() > pre_flush_bytes_written);
3952
3953        writer.close().unwrap();
3954    }
3955
3956    #[test]
3957    fn test_writer_all_null() {
3958        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3959        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3960        let batch = RecordBatch::try_from_iter(vec![
3961            ("a", Arc::new(a) as ArrayRef),
3962            ("b", Arc::new(b) as ArrayRef),
3963        ])
3964        .unwrap();
3965
3966        let mut buf = Vec::with_capacity(1024);
3967        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3968        writer.write(&batch).unwrap();
3969        writer.close().unwrap();
3970
3971        let bytes = Bytes::from(buf);
3972        let options = ReadOptionsBuilder::new().with_page_index().build();
3973        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3974        let index = reader.metadata().offset_index().unwrap();
3975
3976        assert_eq!(index.len(), 1);
3977        assert_eq!(index[0].len(), 2); // 2 columns
3978        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3979        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3980    }
3981
3982    #[test]
3983    fn test_disabled_statistics_with_page() {
3984        let file_schema = Schema::new(vec![
3985            Field::new("a", DataType::Utf8, true),
3986            Field::new("b", DataType::Utf8, true),
3987        ]);
3988        let file_schema = Arc::new(file_schema);
3989
3990        let batch = RecordBatch::try_new(
3991            file_schema.clone(),
3992            vec![
3993                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3994                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3995            ],
3996        )
3997        .unwrap();
3998
3999        let props = WriterProperties::builder()
4000            .set_statistics_enabled(EnabledStatistics::None)
4001            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4002            .build();
4003
4004        let mut buf = Vec::with_capacity(1024);
4005        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4006        writer.write(&batch).unwrap();
4007
4008        let metadata = writer.close().unwrap();
4009        assert_eq!(metadata.num_row_groups(), 1);
4010        let row_group = metadata.row_group(0);
4011        assert_eq!(row_group.num_columns(), 2);
4012        // Column "a" has both offset and column index, as requested
4013        assert!(row_group.column(0).offset_index_offset().is_some());
4014        assert!(row_group.column(0).column_index_offset().is_some());
4015        // Column "b" should only have offset index
4016        assert!(row_group.column(1).offset_index_offset().is_some());
4017        assert!(row_group.column(1).column_index_offset().is_none());
4018
4019        let options = ReadOptionsBuilder::new().with_page_index().build();
4020        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4021
4022        let row_group = reader.get_row_group(0).unwrap();
4023        let a_col = row_group.metadata().column(0);
4024        let b_col = row_group.metadata().column(1);
4025
4026        // Column chunk of column "a" should have chunk level statistics
4027        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4028            let min = byte_array_stats.min_opt().unwrap();
4029            let max = byte_array_stats.max_opt().unwrap();
4030
4031            assert_eq!(min.as_bytes(), b"a");
4032            assert_eq!(max.as_bytes(), b"d");
4033        } else {
4034            panic!("expecting Statistics::ByteArray");
4035        }
4036
4037        // The column chunk for column "b" shouldn't have statistics
4038        assert!(b_col.statistics().is_none());
4039
4040        let offset_index = reader.metadata().offset_index().unwrap();
4041        assert_eq!(offset_index.len(), 1); // 1 row group
4042        assert_eq!(offset_index[0].len(), 2); // 2 columns
4043
4044        let column_index = reader.metadata().column_index().unwrap();
4045        assert_eq!(column_index.len(), 1); // 1 row group
4046        assert_eq!(column_index[0].len(), 2); // 2 columns
4047
4048        let a_idx = &column_index[0][0];
4049        assert!(
4050            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4051            "{a_idx:?}"
4052        );
4053        let b_idx = &column_index[0][1];
4054        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4055    }
4056
4057    #[test]
4058    fn test_disabled_statistics_with_chunk() {
4059        let file_schema = Schema::new(vec![
4060            Field::new("a", DataType::Utf8, true),
4061            Field::new("b", DataType::Utf8, true),
4062        ]);
4063        let file_schema = Arc::new(file_schema);
4064
4065        let batch = RecordBatch::try_new(
4066            file_schema.clone(),
4067            vec![
4068                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4069                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4070            ],
4071        )
4072        .unwrap();
4073
4074        let props = WriterProperties::builder()
4075            .set_statistics_enabled(EnabledStatistics::None)
4076            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4077            .build();
4078
4079        let mut buf = Vec::with_capacity(1024);
4080        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4081        writer.write(&batch).unwrap();
4082
4083        let metadata = writer.close().unwrap();
4084        assert_eq!(metadata.num_row_groups(), 1);
4085        let row_group = metadata.row_group(0);
4086        assert_eq!(row_group.num_columns(), 2);
4087        // Column "a" should only have offset index
4088        assert!(row_group.column(0).offset_index_offset().is_some());
4089        assert!(row_group.column(0).column_index_offset().is_none());
4090        // Column "b" should only have offset index
4091        assert!(row_group.column(1).offset_index_offset().is_some());
4092        assert!(row_group.column(1).column_index_offset().is_none());
4093
4094        let options = ReadOptionsBuilder::new().with_page_index().build();
4095        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4096
4097        let row_group = reader.get_row_group(0).unwrap();
4098        let a_col = row_group.metadata().column(0);
4099        let b_col = row_group.metadata().column(1);
4100
4101        // Column chunk of column "a" should have chunk level statistics
4102        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4103            let min = byte_array_stats.min_opt().unwrap();
4104            let max = byte_array_stats.max_opt().unwrap();
4105
4106            assert_eq!(min.as_bytes(), b"a");
4107            assert_eq!(max.as_bytes(), b"d");
4108        } else {
4109            panic!("expecting Statistics::ByteArray");
4110        }
4111
4112        // The column chunk for column "b"  shouldn't have statistics
4113        assert!(b_col.statistics().is_none());
4114
4115        let column_index = reader.metadata().column_index().unwrap();
4116        assert_eq!(column_index.len(), 1); // 1 row group
4117        assert_eq!(column_index[0].len(), 2); // 2 columns
4118
4119        let a_idx = &column_index[0][0];
4120        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4121        let b_idx = &column_index[0][1];
4122        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4123    }
4124
4125    #[test]
4126    fn test_arrow_writer_skip_metadata() {
4127        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4128        let file_schema = Arc::new(batch_schema.clone());
4129
4130        let batch = RecordBatch::try_new(
4131            Arc::new(batch_schema),
4132            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4133        )
4134        .unwrap();
4135        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4136
4137        let mut buf = Vec::with_capacity(1024);
4138        let mut writer =
4139            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4140        writer.write(&batch).unwrap();
4141        writer.close().unwrap();
4142
4143        let bytes = Bytes::from(buf);
4144        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4145        assert_eq!(file_schema, *reader_builder.schema());
4146        if let Some(key_value_metadata) = reader_builder
4147            .metadata()
4148            .file_metadata()
4149            .key_value_metadata()
4150        {
4151            assert!(
4152                !key_value_metadata
4153                    .iter()
4154                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4155            );
4156        }
4157    }
4158
4159    #[test]
4160    fn test_arrow_writer_explicit_schema() {
4161        // Write an int32 array using explicit int64 storage
4162        let batch_schema = Arc::new(Schema::new(vec![Field::new(
4163            "integers",
4164            DataType::Int32,
4165            true,
4166        )]));
4167        let parquet_schema = Type::group_type_builder("root")
4168            .with_fields(vec![
4169                Type::primitive_type_builder("integers", crate::basic::Type::INT64)
4170                    .build()
4171                    .unwrap()
4172                    .into(),
4173            ])
4174            .build()
4175            .unwrap();
4176        let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());
4177
4178        let batch = RecordBatch::try_new(
4179            batch_schema.clone(),
4180            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4181        )
4182        .unwrap();
4183
4184        let explicit_schema_options =
4185            ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
4186        let mut buf = Vec::with_capacity(1024);
4187        let mut writer = ArrowWriter::try_new_with_options(
4188            &mut buf,
4189            batch_schema.clone(),
4190            explicit_schema_options,
4191        )
4192        .unwrap();
4193        writer.write(&batch).unwrap();
4194        writer.close().unwrap();
4195
4196        let bytes = Bytes::from(buf);
4197        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4198
4199        let expected_schema = Arc::new(Schema::new(vec![Field::new(
4200            "integers",
4201            DataType::Int64,
4202            true,
4203        )]));
4204        assert_eq!(reader_builder.schema(), &expected_schema);
4205
4206        let batches = reader_builder
4207            .build()
4208            .unwrap()
4209            .collect::<Result<Vec<_>, ArrowError>>()
4210            .unwrap();
4211        assert_eq!(batches.len(), 1);
4212
4213        let expected_batch = RecordBatch::try_new(
4214            expected_schema.clone(),
4215            vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
4216        )
4217        .unwrap();
4218        assert_eq!(batches[0], expected_batch);
4219    }
4220
4221    #[test]
4222    fn mismatched_schemas() {
4223        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4224        let file_schema = Arc::new(Schema::new(vec![Field::new(
4225            "temperature",
4226            DataType::Float64,
4227            false,
4228        )]));
4229
4230        let batch = RecordBatch::try_new(
4231            Arc::new(batch_schema),
4232            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4233        )
4234        .unwrap();
4235
4236        let mut buf = Vec::with_capacity(1024);
4237        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4238
4239        let err = writer.write(&batch).unwrap_err().to_string();
4240        assert_eq!(
4241            err,
4242            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4243        );
4244    }
4245
4246    #[test]
4247    // https://github.com/apache/arrow-rs/issues/6988
4248    fn test_roundtrip_empty_schema() {
4249        // create empty record batch with empty schema
4250        let empty_batch = RecordBatch::try_new_with_options(
4251            Arc::new(Schema::empty()),
4252            vec![],
4253            &RecordBatchOptions::default().with_row_count(Some(0)),
4254        )
4255        .unwrap();
4256
4257        // write to parquet
4258        let mut parquet_bytes: Vec<u8> = Vec::new();
4259        let mut writer =
4260            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4261        writer.write(&empty_batch).unwrap();
4262        writer.close().unwrap();
4263
4264        // read from parquet
4265        let bytes = Bytes::from(parquet_bytes);
4266        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4267        assert_eq!(reader.schema(), &empty_batch.schema());
4268        let batches: Vec<_> = reader
4269            .build()
4270            .unwrap()
4271            .collect::<ArrowResult<Vec<_>>>()
4272            .unwrap();
4273        assert_eq!(batches.len(), 0);
4274    }
4275
4276    #[test]
4277    fn test_page_stats_not_written_by_default() {
4278        let string_field = Field::new("a", DataType::Utf8, false);
4279        let schema = Schema::new(vec![string_field]);
4280        let raw_string_values = vec!["Blart Versenwald III"];
4281        let string_values = StringArray::from(raw_string_values.clone());
4282        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4283
4284        let props = WriterProperties::builder()
4285            .set_statistics_enabled(EnabledStatistics::Page)
4286            .set_dictionary_enabled(false)
4287            .set_encoding(Encoding::PLAIN)
4288            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4289            .build();
4290
4291        let file = roundtrip_opts(&batch, props);
4292
4293        // read file and decode page headers
4294        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4295
4296        // decode first page header
4297        let first_page = &file[4..];
4298        let mut prot = ThriftSliceInputProtocol::new(first_page);
4299        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4300        let stats = hdr.data_page_header.unwrap().statistics;
4301
4302        assert!(stats.is_none());
4303    }
4304
4305    #[test]
4306    fn test_page_stats_when_enabled() {
4307        let string_field = Field::new("a", DataType::Utf8, false);
4308        let schema = Schema::new(vec![string_field]);
4309        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4310        let string_values = StringArray::from(raw_string_values.clone());
4311        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4312
4313        let props = WriterProperties::builder()
4314            .set_statistics_enabled(EnabledStatistics::Page)
4315            .set_dictionary_enabled(false)
4316            .set_encoding(Encoding::PLAIN)
4317            .set_write_page_header_statistics(true)
4318            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4319            .build();
4320
4321        let file = roundtrip_opts(&batch, props);
4322
4323        // read file and decode page headers
4324        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4325
4326        // decode first page header
4327        let first_page = &file[4..];
4328        let mut prot = ThriftSliceInputProtocol::new(first_page);
4329        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4330        let stats = hdr.data_page_header.unwrap().statistics;
4331
4332        let stats = stats.unwrap();
4333        // check that min/max were actually written to the page
4334        assert!(stats.is_max_value_exact.unwrap());
4335        assert!(stats.is_min_value_exact.unwrap());
4336        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4337        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4338    }
4339
4340    #[test]
4341    fn test_page_stats_truncation() {
4342        let string_field = Field::new("a", DataType::Utf8, false);
4343        let binary_field = Field::new("b", DataType::Binary, false);
4344        let schema = Schema::new(vec![string_field, binary_field]);
4345
4346        let raw_string_values = vec!["Blart Versenwald III"];
4347        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4348        let raw_binary_value_refs = raw_binary_values
4349            .iter()
4350            .map(|x| x.as_slice())
4351            .collect::<Vec<_>>();
4352
4353        let string_values = StringArray::from(raw_string_values.clone());
4354        let binary_values = BinaryArray::from(raw_binary_value_refs);
4355        let batch = RecordBatch::try_new(
4356            Arc::new(schema),
4357            vec![Arc::new(string_values), Arc::new(binary_values)],
4358        )
4359        .unwrap();
4360
4361        let props = WriterProperties::builder()
4362            .set_statistics_truncate_length(Some(2))
4363            .set_dictionary_enabled(false)
4364            .set_encoding(Encoding::PLAIN)
4365            .set_write_page_header_statistics(true)
4366            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4367            .build();
4368
4369        let file = roundtrip_opts(&batch, props);
4370
4371        // read file and decode page headers
4372        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4373
4374        // decode first page header
4375        let first_page = &file[4..];
4376        let mut prot = ThriftSliceInputProtocol::new(first_page);
4377        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4378        let stats = hdr.data_page_header.unwrap().statistics;
4379        assert!(stats.is_some());
4380        let stats = stats.unwrap();
4381        // check that min/max were properly truncated
4382        assert!(!stats.is_max_value_exact.unwrap());
4383        assert!(!stats.is_min_value_exact.unwrap());
4384        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4385        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4386
4387        // check second page now
4388        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4389        let mut prot = ThriftSliceInputProtocol::new(second_page);
4390        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4391        let stats = hdr.data_page_header.unwrap().statistics;
4392        assert!(stats.is_some());
4393        let stats = stats.unwrap();
4394        // check that min/max were properly truncated
4395        assert!(!stats.is_max_value_exact.unwrap());
4396        assert!(!stats.is_min_value_exact.unwrap());
4397        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4398        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4399    }
4400
4401    #[test]
4402    fn test_page_encoding_statistics_roundtrip() {
4403        let batch_schema = Schema::new(vec![Field::new(
4404            "int32",
4405            arrow_schema::DataType::Int32,
4406            false,
4407        )]);
4408
4409        let batch = RecordBatch::try_new(
4410            Arc::new(batch_schema.clone()),
4411            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4412        )
4413        .unwrap();
4414
4415        let mut file: File = tempfile::tempfile().unwrap();
4416        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4417        writer.write(&batch).unwrap();
4418        let file_metadata = writer.close().unwrap();
4419
4420        assert_eq!(file_metadata.num_row_groups(), 1);
4421        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4422        assert!(
4423            file_metadata
4424                .row_group(0)
4425                .column(0)
4426                .page_encoding_stats()
4427                .is_some()
4428        );
4429        let chunk_page_stats = file_metadata
4430            .row_group(0)
4431            .column(0)
4432            .page_encoding_stats()
4433            .unwrap();
4434
4435        // check that the read metadata is also correct
4436        let options = ReadOptionsBuilder::new().with_page_index().build();
4437        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4438
4439        let rowgroup = reader.get_row_group(0).expect("row group missing");
4440        assert_eq!(rowgroup.num_columns(), 1);
4441        let column = rowgroup.metadata().column(0);
4442        assert!(column.page_encoding_stats().is_some());
4443        let file_page_stats = column.page_encoding_stats().unwrap();
4444        assert_eq!(chunk_page_stats, file_page_stats);
4445    }
4446
4447    #[test]
4448    fn test_different_dict_page_size_limit() {
4449        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4450        let schema = Arc::new(Schema::new(vec![
4451            Field::new("col0", arrow_schema::DataType::Int64, false),
4452            Field::new("col1", arrow_schema::DataType::Int64, false),
4453        ]));
4454        let batch =
4455            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4456
4457        let props = WriterProperties::builder()
4458            .set_dictionary_page_size_limit(1024 * 1024)
4459            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4460            .build();
4461        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4462        writer.write(&batch).unwrap();
4463        let data = Bytes::from(writer.into_inner().unwrap());
4464
4465        let mut metadata = ParquetMetaDataReader::new();
4466        metadata.try_parse(&data).unwrap();
4467        let metadata = metadata.finish().unwrap();
4468        let col0_meta = metadata.row_group(0).column(0);
4469        let col1_meta = metadata.row_group(0).column(1);
4470
4471        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4472            let mut reader =
4473                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4474            let page = reader.get_next_page().unwrap().unwrap();
4475            match page {
4476                Page::DictionaryPage { buf, .. } => buf.len(),
4477                _ => panic!("expected DictionaryPage"),
4478            }
4479        };
4480
4481        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4482        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4483    }
4484}