Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
30use arrow_schema::{
31    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
32};
33
34use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
35
36use crate::arrow::ArrowSchemaConverter;
37use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
38use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
39use crate::column::page_encryption::PageEncryptor;
40use crate::column::writer::encoder::ColumnValueEncoder;
41use crate::column::writer::{
42    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
43};
44use crate::data_type::{ByteArray, FixedLenByteArray};
45#[cfg(feature = "encryption")]
46use crate::encryption::encrypt::FileEncryptor;
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
49use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
50use crate::file::reader::{ChunkReader, Length};
51use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
52use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
53use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
54use levels::{ArrayLevels, calculate_array_levels};
55
56mod byte_array;
57mod levels;
58
59/// Encodes [`RecordBatch`] to parquet
60///
61/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
62/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
63/// flushed on close, leading the final row group in the output file to potentially
64/// contain fewer than `max_row_group_size` rows
65///
66/// # Example: Writing `RecordBatch`es
67/// ```
68/// # use std::sync::Arc;
69/// # use bytes::Bytes;
70/// # use arrow_array::{ArrayRef, Int64Array};
71/// # use arrow_array::RecordBatch;
72/// # use parquet::arrow::arrow_writer::ArrowWriter;
73/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
74/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
75/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
76///
77/// let mut buffer = Vec::new();
78/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
79/// writer.write(&to_write).unwrap();
80/// writer.close().unwrap();
81///
82/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
83/// let read = reader.next().unwrap().unwrap();
84///
85/// assert_eq!(to_write, read);
86/// ```
87///
88/// # Memory Usage and Limiting
89///
90/// The nature of Parquet requires buffering of an entire row group before it can
91/// be flushed to the underlying writer. Data is mostly buffered in its encoded
92/// form, reducing memory usage. However, some data such as dictionary keys,
93/// large strings or very nested data may still result in non-trivial memory
94/// usage.
95///
96/// See Also:
97/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
98/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
99///
100/// Call [`Self::flush`] to trigger an early flush of a row group based on a
101/// memory threshold and/or global memory pressure. However,  smaller row groups
102/// result in higher metadata overheads, and thus may worsen compression ratios
103/// and query performance.
104///
105/// ```no_run
106/// # use std::io::Write;
107/// # use arrow_array::RecordBatch;
108/// # use parquet::arrow::ArrowWriter;
109/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
110/// # let batch: RecordBatch = todo!();
111/// writer.write(&batch).unwrap();
112/// // Trigger an early flush if anticipated size exceeds 1_000_000
113/// if writer.in_progress_size() > 1_000_000 {
114///     writer.flush().unwrap();
115/// }
116/// ```
117///
118/// ## Type Support
119///
120/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
121/// Parquet types including  [`StructArray`] and [`ListArray`].
122///
123/// The following are not supported:
124///
125/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
126///
127/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
128/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
129/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
130/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
131/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
132///
133/// ## Type Compatibility
134/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
135/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
136/// value type.
137///
138/// For example, the following [`DataType`]s are all logically equivalent and can be written
139/// to the same column:
140/// * String, LargeString, StringView
141/// * Binary, LargeBinary, BinaryView
142///
143/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
144/// contain compatible values.
145/// ```
146/// # use std::sync::Arc;
147/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
148/// # use arrow_schema::{DataType, Field, Schema};
149/// # use parquet::arrow::arrow_writer::ArrowWriter;
150/// let record_batch1 = RecordBatch::try_new(
151///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
152///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
153///  )
154/// .unwrap();
155///
156/// let mut buffer = Vec::new();
157/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
158/// writer.write(&record_batch1).unwrap();
159///
160/// let record_batch2 = RecordBatch::try_new(
161///     Arc::new(Schema::new(vec![Field::new(
162///         "col",
163///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
164///          false,
165///     )])),
166///     vec![Arc::new(DictionaryArray::new(
167///          UInt8Array::from_iter_values(vec![0, 1]),
168///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
169///      ))],
170///  )
171///  .unwrap();
172///  writer.write(&record_batch2).unwrap();
173///  writer.close();
174/// ```
175pub struct ArrowWriter<W: Write> {
176    /// Underlying Parquet writer
177    writer: SerializedFileWriter<W>,
178
179    /// The in-progress row group if any
180    in_progress: Option<ArrowRowGroupWriter>,
181
182    /// A copy of the Arrow schema.
183    ///
184    /// The schema is used to verify that each record batch written has the correct schema
185    arrow_schema: SchemaRef,
186
187    /// Creates new [`ArrowRowGroupWriter`] instances as required
188    row_group_writer_factory: ArrowRowGroupWriterFactory,
189
190    /// The length of arrays to write to each row group
191    max_row_group_size: usize,
192}
193
194impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        let buffered_memory = self.in_progress_size();
197        f.debug_struct("ArrowWriter")
198            .field("writer", &self.writer)
199            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
200            .field("in_progress_rows", &self.in_progress_rows())
201            .field("arrow_schema", &self.arrow_schema)
202            .field("max_row_group_size", &self.max_row_group_size)
203            .finish()
204    }
205}
206
207impl<W: Write + Send> ArrowWriter<W> {
208    /// Try to create a new Arrow writer
209    ///
210    /// The writer will fail if:
211    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
212    ///  * the Arrow schema contains unsupported datatypes such as Unions
213    pub fn try_new(
214        writer: W,
215        arrow_schema: SchemaRef,
216        props: Option<WriterProperties>,
217    ) -> Result<Self> {
218        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
219        Self::try_new_with_options(writer, arrow_schema, options)
220    }
221
222    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
223    ///
224    /// The writer will fail if:
225    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
226    ///  * the Arrow schema contains unsupported datatypes such as Unions
227    pub fn try_new_with_options(
228        writer: W,
229        arrow_schema: SchemaRef,
230        options: ArrowWriterOptions,
231    ) -> Result<Self> {
232        let mut props = options.properties;
233
234        let schema = if let Some(parquet_schema) = options.schema_descr {
235            parquet_schema.clone()
236        } else {
237            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
238            if let Some(schema_root) = &options.schema_root {
239                converter = converter.schema_root(schema_root);
240            }
241
242            converter.convert(&arrow_schema)?
243        };
244
245        if !options.skip_arrow_metadata {
246            // add serialized arrow schema
247            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
248        }
249
250        let max_row_group_size = props.max_row_group_size();
251
252        let props_ptr = Arc::new(props);
253        let file_writer =
254            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
255
256        let row_group_writer_factory =
257            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
258
259        Ok(Self {
260            writer: file_writer,
261            in_progress: None,
262            arrow_schema,
263            row_group_writer_factory,
264            max_row_group_size,
265        })
266    }
267
268    /// Returns metadata for any flushed row groups
269    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
270        self.writer.flushed_row_groups()
271    }
272
273    /// Estimated memory usage, in bytes, of this `ArrowWriter`
274    ///
275    /// This estimate is formed bu summing the values of
276    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
277    pub fn memory_size(&self) -> usize {
278        match &self.in_progress {
279            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
280            None => 0,
281        }
282    }
283
284    /// Anticipated encoded size of the in progress row group.
285    ///
286    /// This estimate the row group size after being completely encoded is,
287    /// formed by summing the values of
288    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
289    /// columns.
290    pub fn in_progress_size(&self) -> usize {
291        match &self.in_progress {
292            Some(in_progress) => in_progress
293                .writers
294                .iter()
295                .map(|x| x.get_estimated_total_bytes())
296                .sum(),
297            None => 0,
298        }
299    }
300
301    /// Returns the number of rows buffered in the in progress row group
302    pub fn in_progress_rows(&self) -> usize {
303        self.in_progress
304            .as_ref()
305            .map(|x| x.buffered_rows)
306            .unwrap_or_default()
307    }
308
309    /// Returns the number of bytes written by this instance
310    pub fn bytes_written(&self) -> usize {
311        self.writer.bytes_written()
312    }
313
314    /// Encodes the provided [`RecordBatch`]
315    ///
316    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
317    /// rows, the contents of `batch` will be written to one or more row groups such that all but
318    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
319    ///
320    /// This will fail if the `batch`'s schema does not match the writer's schema.
321    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
322        if batch.num_rows() == 0 {
323            return Ok(());
324        }
325
326        let in_progress = match &mut self.in_progress {
327            Some(in_progress) => in_progress,
328            x => x.insert(
329                self.row_group_writer_factory
330                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
331            ),
332        };
333
334        // If would exceed max_row_group_size, split batch
335        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
336            let to_write = self.max_row_group_size - in_progress.buffered_rows;
337            let a = batch.slice(0, to_write);
338            let b = batch.slice(to_write, batch.num_rows() - to_write);
339            self.write(&a)?;
340            return self.write(&b);
341        }
342
343        in_progress.write(batch)?;
344
345        if in_progress.buffered_rows >= self.max_row_group_size {
346            self.flush()?
347        }
348        Ok(())
349    }
350
351    /// Writes the given buf bytes to the internal buffer.
352    ///
353    /// It's safe to use this method to write data to the underlying writer,
354    /// because it will ensure that the buffering and byte‐counting layers are used.
355    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
356        self.writer.write_all(buf)
357    }
358
359    /// Flushes underlying writer
360    pub fn sync(&mut self) -> std::io::Result<()> {
361        self.writer.flush()
362    }
363
364    /// Flushes all buffered rows into a new row group
365    ///
366    /// Note the underlying writer is not flushed with this call.
367    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
368    pub fn flush(&mut self) -> Result<()> {
369        let in_progress = match self.in_progress.take() {
370            Some(in_progress) => in_progress,
371            None => return Ok(()),
372        };
373
374        let mut row_group_writer = self.writer.next_row_group()?;
375        for chunk in in_progress.close()? {
376            chunk.append_to_row_group(&mut row_group_writer)?;
377        }
378        row_group_writer.close()?;
379        Ok(())
380    }
381
382    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
383    ///
384    /// This method provide a way to append kv_metadata after write RecordBatch
385    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
386        self.writer.append_key_value_metadata(kv_metadata)
387    }
388
389    /// Returns a reference to the underlying writer.
390    pub fn inner(&self) -> &W {
391        self.writer.inner()
392    }
393
394    /// Returns a mutable reference to the underlying writer.
395    ///
396    /// **Warning**: if you write directly to this writer, you will skip
397    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
398    /// the file footer’s recorded offsets and sizes to diverge from reality,
399    /// resulting in an unreadable or corrupted Parquet file.
400    ///
401    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
402    pub fn inner_mut(&mut self) -> &mut W {
403        self.writer.inner_mut()
404    }
405
406    /// Flushes any outstanding data and returns the underlying writer.
407    pub fn into_inner(mut self) -> Result<W> {
408        self.flush()?;
409        self.writer.into_inner()
410    }
411
412    /// Close and finalize the underlying Parquet writer
413    ///
414    /// Unlike [`Self::close`] this does not consume self
415    ///
416    /// Attempting to write after calling finish will result in an error
417    pub fn finish(&mut self) -> Result<ParquetMetaData> {
418        self.flush()?;
419        self.writer.finish()
420    }
421
422    /// Close and finalize the underlying Parquet writer
423    pub fn close(mut self) -> Result<ParquetMetaData> {
424        self.finish()
425    }
426
427    /// Create a new row group writer and return its column writers.
428    #[deprecated(
429        since = "56.2.0",
430        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
431    )]
432    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
433        self.flush()?;
434        let in_progress = self
435            .row_group_writer_factory
436            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
437        Ok(in_progress.writers)
438    }
439
440    /// Append the given column chunks to the file as a new row group.
441    #[deprecated(
442        since = "56.2.0",
443        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
444    )]
445    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
446        let mut row_group_writer = self.writer.next_row_group()?;
447        for chunk in chunks {
448            chunk.append_to_row_group(&mut row_group_writer)?;
449        }
450        row_group_writer.close()?;
451        Ok(())
452    }
453
454    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
455    ///
456    /// Flushes any outstanding data before returning.
457    ///
458    /// This can be useful to provide more control over how files are written, for example
459    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
460    pub fn into_serialized_writer(
461        mut self,
462    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
463        self.flush()?;
464        Ok((self.writer, self.row_group_writer_factory))
465    }
466}
467
468impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
469    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
470        self.write(batch).map_err(|e| e.into())
471    }
472
473    fn close(self) -> std::result::Result<(), ArrowError> {
474        self.close()?;
475        Ok(())
476    }
477}
478
479/// Arrow-specific configuration settings for writing parquet files.
480///
481/// See [`ArrowWriter`] for how to configure the writer.
482#[derive(Debug, Clone, Default)]
483pub struct ArrowWriterOptions {
484    properties: WriterProperties,
485    skip_arrow_metadata: bool,
486    schema_root: Option<String>,
487    schema_descr: Option<SchemaDescriptor>,
488}
489
490impl ArrowWriterOptions {
491    /// Creates a new [`ArrowWriterOptions`] with the default settings.
492    pub fn new() -> Self {
493        Self::default()
494    }
495
496    /// Sets the [`WriterProperties`] for writing parquet files.
497    pub fn with_properties(self, properties: WriterProperties) -> Self {
498        Self { properties, ..self }
499    }
500
501    /// Skip encoding the embedded arrow metadata (defaults to `false`)
502    ///
503    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
504    /// by default.
505    ///
506    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
507    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
508        Self {
509            skip_arrow_metadata,
510            ..self
511        }
512    }
513
514    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
515    pub fn with_schema_root(self, schema_root: String) -> Self {
516        Self {
517            schema_root: Some(schema_root),
518            ..self
519        }
520    }
521
522    /// Explicitly specify the Parquet schema to be used
523    ///
524    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
525    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
526    /// already known or must be calculated using custom logic.
527    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
528        Self {
529            schema_descr: Some(schema_descr),
530            ..self
531        }
532    }
533}
534
535/// A single column chunk produced by [`ArrowColumnWriter`]
536#[derive(Default)]
537struct ArrowColumnChunkData {
538    length: usize,
539    data: Vec<Bytes>,
540}
541
542impl Length for ArrowColumnChunkData {
543    fn len(&self) -> u64 {
544        self.length as _
545    }
546}
547
548impl ChunkReader for ArrowColumnChunkData {
549    type T = ArrowColumnChunkReader;
550
551    fn get_read(&self, start: u64) -> Result<Self::T> {
552        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
553        Ok(ArrowColumnChunkReader(
554            self.data.clone().into_iter().peekable(),
555        ))
556    }
557
558    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
559        unimplemented!()
560    }
561}
562
563/// A [`Read`] for [`ArrowColumnChunkData`]
564struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
565
566impl Read for ArrowColumnChunkReader {
567    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
568        let buffer = loop {
569            match self.0.peek_mut() {
570                Some(b) if b.is_empty() => {
571                    self.0.next();
572                    continue;
573                }
574                Some(b) => break b,
575                None => return Ok(0),
576            }
577        };
578
579        let len = buffer.len().min(out.len());
580        let b = buffer.split_to(len);
581        out[..len].copy_from_slice(&b);
582        Ok(len)
583    }
584}
585
586/// A shared [`ArrowColumnChunkData`]
587///
588/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
589/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
590type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
591
592#[derive(Default)]
593struct ArrowPageWriter {
594    buffer: SharedColumnChunk,
595    #[cfg(feature = "encryption")]
596    page_encryptor: Option<PageEncryptor>,
597}
598
599impl ArrowPageWriter {
600    #[cfg(feature = "encryption")]
601    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
602        self.page_encryptor = page_encryptor;
603        self
604    }
605
606    #[cfg(feature = "encryption")]
607    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
608        self.page_encryptor.as_mut()
609    }
610
611    #[cfg(not(feature = "encryption"))]
612    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
613        None
614    }
615}
616
617impl PageWriter for ArrowPageWriter {
618    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
619        let page = match self.page_encryptor_mut() {
620            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
621            None => page,
622        };
623
624        let page_header = page.to_thrift_header()?;
625        let header = {
626            let mut header = Vec::with_capacity(1024);
627
628            match self.page_encryptor_mut() {
629                Some(page_encryptor) => {
630                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
631                    if page.compressed_page().is_data_page() {
632                        page_encryptor.increment_page();
633                    }
634                }
635                None => {
636                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
637                    page_header.write_thrift(&mut protocol)?;
638                }
639            };
640
641            Bytes::from(header)
642        };
643
644        let mut buf = self.buffer.try_lock().unwrap();
645
646        let data = page.compressed_page().buffer().clone();
647        let compressed_size = data.len() + header.len();
648
649        let mut spec = PageWriteSpec::new();
650        spec.page_type = page.page_type();
651        spec.num_values = page.num_values();
652        spec.uncompressed_size = page.uncompressed_size() + header.len();
653        spec.offset = buf.length as u64;
654        spec.compressed_size = compressed_size;
655        spec.bytes_written = compressed_size as u64;
656
657        buf.length += compressed_size;
658        buf.data.push(header);
659        buf.data.push(data);
660
661        Ok(spec)
662    }
663
664    fn close(&mut self) -> Result<()> {
665        Ok(())
666    }
667}
668
669/// A leaf column that can be encoded by [`ArrowColumnWriter`]
670#[derive(Debug)]
671pub struct ArrowLeafColumn(ArrayLevels);
672
673/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
674///
675/// This function can be used along with [`get_column_writers`] to encode
676/// individual columns in parallel. See example on [`ArrowColumnWriter`]
677pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
678    let levels = calculate_array_levels(array, field)?;
679    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
680}
681
682/// The data for a single column chunk, see [`ArrowColumnWriter`]
683pub struct ArrowColumnChunk {
684    data: ArrowColumnChunkData,
685    close: ColumnCloseResult,
686}
687
688impl std::fmt::Debug for ArrowColumnChunk {
689    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690        f.debug_struct("ArrowColumnChunk")
691            .field("length", &self.data.length)
692            .finish_non_exhaustive()
693    }
694}
695
696impl ArrowColumnChunk {
697    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
698    pub fn append_to_row_group<W: Write + Send>(
699        self,
700        writer: &mut SerializedRowGroupWriter<'_, W>,
701    ) -> Result<()> {
702        writer.append_column(&self.data, self.close)
703    }
704}
705
706/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
707///
708/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
709///
710/// Note: This is a low-level interface for applications that require
711/// fine-grained control of encoding (e.g. encoding using multiple threads),
712/// see [`ArrowWriter`] for a higher-level interface
713///
714/// # Example: Encoding two Arrow Array's in Parallel
715/// ```
716/// // The arrow schema
717/// # use std::sync::Arc;
718/// # use arrow_array::*;
719/// # use arrow_schema::*;
720/// # use parquet::arrow::ArrowSchemaConverter;
721/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
722/// # use parquet::file::properties::WriterProperties;
723/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
724/// #
725/// let schema = Arc::new(Schema::new(vec![
726///     Field::new("i32", DataType::Int32, false),
727///     Field::new("f32", DataType::Float32, false),
728/// ]));
729///
730/// // Compute the parquet schema
731/// let props = Arc::new(WriterProperties::default());
732/// let parquet_schema = ArrowSchemaConverter::new()
733///   .with_coerce_types(props.coerce_types())
734///   .convert(&schema)
735///   .unwrap();
736///
737/// // Create parquet writer
738/// let root_schema = parquet_schema.root_schema_ptr();
739/// // write to memory in the example, but this could be a File
740/// let mut out = Vec::with_capacity(1024);
741/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
742///   .unwrap();
743///
744/// // Create a factory for building Arrow column writers
745/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
746/// // Create column writers for the 0th row group
747/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
748///
749/// // Spawn a worker thread for each column
750/// //
751/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
752/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
753/// let mut workers: Vec<_> = col_writers
754///     .into_iter()
755///     .map(|mut col_writer| {
756///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
757///         let handle = std::thread::spawn(move || {
758///             // receive Arrays to encode via the channel
759///             for col in recv {
760///                 col_writer.write(&col)?;
761///             }
762///             // once the input is complete, close the writer
763///             // to return the newly created ArrowColumnChunk
764///             col_writer.close()
765///         });
766///         (handle, send)
767///     })
768///     .collect();
769///
770/// // Start row group
771/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
772///   .next_row_group()
773///   .unwrap();
774///
775/// // Create some example input columns to encode
776/// let to_write = vec![
777///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
778///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
779/// ];
780///
781/// // Send the input columns to the workers
782/// let mut worker_iter = workers.iter_mut();
783/// for (arr, field) in to_write.iter().zip(&schema.fields) {
784///     for leaves in compute_leaves(field, arr).unwrap() {
785///         worker_iter.next().unwrap().1.send(leaves).unwrap();
786///     }
787/// }
788///
789/// // Wait for the workers to complete encoding, and append
790/// // the resulting column chunks to the row group (and the file)
791/// for (handle, send) in workers {
792///     drop(send); // Drop send side to signal termination
793///     // wait for the worker to send the completed chunk
794///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
795///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
796/// }
797/// // Close the row group which writes to the underlying file
798/// row_group_writer.close().unwrap();
799///
800/// let metadata = writer.close().unwrap();
801/// assert_eq!(metadata.file_metadata().num_rows(), 3);
802/// ```
803pub struct ArrowColumnWriter {
804    writer: ArrowColumnWriterImpl,
805    chunk: SharedColumnChunk,
806}
807
808impl std::fmt::Debug for ArrowColumnWriter {
809    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
811    }
812}
813
814enum ArrowColumnWriterImpl {
815    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
816    Column(ColumnWriter<'static>),
817}
818
819impl ArrowColumnWriter {
820    /// Write an [`ArrowLeafColumn`]
821    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
822        match &mut self.writer {
823            ArrowColumnWriterImpl::Column(c) => {
824                let leaf = col.0.array();
825                match leaf.as_any_dictionary_opt() {
826                    Some(dictionary) => {
827                        let materialized =
828                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
829                        write_leaf(c, &materialized, &col.0)?
830                    }
831                    None => write_leaf(c, leaf, &col.0)?,
832                };
833            }
834            ArrowColumnWriterImpl::ByteArray(c) => {
835                write_primitive(c, col.0.array().as_ref(), &col.0)?;
836            }
837        }
838        Ok(())
839    }
840
841    /// Close this column returning the written [`ArrowColumnChunk`]
842    pub fn close(self) -> Result<ArrowColumnChunk> {
843        let close = match self.writer {
844            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
845            ArrowColumnWriterImpl::Column(c) => c.close()?,
846        };
847        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
848        let data = chunk.into_inner().unwrap();
849        Ok(ArrowColumnChunk { data, close })
850    }
851
852    /// Returns the estimated total memory usage by the writer.
853    ///
854    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
855    /// of the current memory usage and not it's anticipated encoded size.
856    ///
857    /// This includes:
858    /// 1. Data buffered in encoded form
859    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
860    ///
861    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
862    pub fn memory_size(&self) -> usize {
863        match &self.writer {
864            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
865            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
866        }
867    }
868
869    /// Returns the estimated total encoded bytes for this column writer.
870    ///
871    /// This includes:
872    /// 1. Data buffered in encoded form
873    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
874    ///
875    /// This value should be less than or equal to [`Self::memory_size`]
876    pub fn get_estimated_total_bytes(&self) -> usize {
877        match &self.writer {
878            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
879            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
880        }
881    }
882}
883
884/// Encodes [`RecordBatch`] to a parquet row group
885///
886/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
887/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
888///
889/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
890#[derive(Debug)]
891struct ArrowRowGroupWriter {
892    writers: Vec<ArrowColumnWriter>,
893    schema: SchemaRef,
894    buffered_rows: usize,
895}
896
897impl ArrowRowGroupWriter {
898    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
899        Self {
900            writers,
901            schema: arrow.clone(),
902            buffered_rows: 0,
903        }
904    }
905
906    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
907        self.buffered_rows += batch.num_rows();
908        let mut writers = self.writers.iter_mut();
909        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
910            for leaf in compute_leaves(field.as_ref(), column)? {
911                writers.next().unwrap().write(&leaf)?
912            }
913        }
914        Ok(())
915    }
916
917    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
918        self.writers
919            .into_iter()
920            .map(|writer| writer.close())
921            .collect()
922    }
923}
924
925/// Factory that creates new column writers for each row group in the Parquet file.
926///
927/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
928/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
929#[derive(Debug)]
930pub struct ArrowRowGroupWriterFactory {
931    schema: SchemaDescPtr,
932    arrow_schema: SchemaRef,
933    props: WriterPropertiesPtr,
934    #[cfg(feature = "encryption")]
935    file_encryptor: Option<Arc<FileEncryptor>>,
936}
937
938impl ArrowRowGroupWriterFactory {
939    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
940    pub fn new<W: Write + Send>(
941        file_writer: &SerializedFileWriter<W>,
942        arrow_schema: SchemaRef,
943    ) -> Self {
944        let schema = Arc::clone(file_writer.schema_descr_ptr());
945        let props = Arc::clone(file_writer.properties());
946        Self {
947            schema,
948            arrow_schema,
949            props,
950            #[cfg(feature = "encryption")]
951            file_encryptor: file_writer.file_encryptor(),
952        }
953    }
954
955    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
956        let writers = self.create_column_writers(row_group_index)?;
957        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
958    }
959
960    /// Create column writers for a new row group, with the given row group index
961    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
962        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
963        let mut leaves = self.schema.columns().iter();
964        let column_factory = self.column_writer_factory(row_group_index);
965        for field in &self.arrow_schema.fields {
966            column_factory.get_arrow_column_writer(
967                field.data_type(),
968                &self.props,
969                &mut leaves,
970                &mut writers,
971            )?;
972        }
973        Ok(writers)
974    }
975
976    #[cfg(feature = "encryption")]
977    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
978        ArrowColumnWriterFactory::new()
979            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
980    }
981
982    #[cfg(not(feature = "encryption"))]
983    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
984        ArrowColumnWriterFactory::new()
985    }
986}
987
988/// Returns [`ArrowColumnWriter`]s for each column in a given schema
989#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
990pub fn get_column_writers(
991    parquet: &SchemaDescriptor,
992    props: &WriterPropertiesPtr,
993    arrow: &SchemaRef,
994) -> Result<Vec<ArrowColumnWriter>> {
995    let mut writers = Vec::with_capacity(arrow.fields.len());
996    let mut leaves = parquet.columns().iter();
997    let column_factory = ArrowColumnWriterFactory::new();
998    for field in &arrow.fields {
999        column_factory.get_arrow_column_writer(
1000            field.data_type(),
1001            props,
1002            &mut leaves,
1003            &mut writers,
1004        )?;
1005    }
1006    Ok(writers)
1007}
1008
1009/// Creates [`ArrowColumnWriter`] instances
1010struct ArrowColumnWriterFactory {
1011    #[cfg(feature = "encryption")]
1012    row_group_index: usize,
1013    #[cfg(feature = "encryption")]
1014    file_encryptor: Option<Arc<FileEncryptor>>,
1015}
1016
1017impl ArrowColumnWriterFactory {
1018    pub fn new() -> Self {
1019        Self {
1020            #[cfg(feature = "encryption")]
1021            row_group_index: 0,
1022            #[cfg(feature = "encryption")]
1023            file_encryptor: None,
1024        }
1025    }
1026
1027    #[cfg(feature = "encryption")]
1028    pub fn with_file_encryptor(
1029        mut self,
1030        row_group_index: usize,
1031        file_encryptor: Option<Arc<FileEncryptor>>,
1032    ) -> Self {
1033        self.row_group_index = row_group_index;
1034        self.file_encryptor = file_encryptor;
1035        self
1036    }
1037
1038    #[cfg(feature = "encryption")]
1039    fn create_page_writer(
1040        &self,
1041        column_descriptor: &ColumnDescPtr,
1042        column_index: usize,
1043    ) -> Result<Box<ArrowPageWriter>> {
1044        let column_path = column_descriptor.path().string();
1045        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1046            &self.file_encryptor,
1047            self.row_group_index,
1048            column_index,
1049            &column_path,
1050        )?;
1051        Ok(Box::new(
1052            ArrowPageWriter::default().with_encryptor(page_encryptor),
1053        ))
1054    }
1055
1056    #[cfg(not(feature = "encryption"))]
1057    fn create_page_writer(
1058        &self,
1059        _column_descriptor: &ColumnDescPtr,
1060        _column_index: usize,
1061    ) -> Result<Box<ArrowPageWriter>> {
1062        Ok(Box::<ArrowPageWriter>::default())
1063    }
1064
1065    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1066    /// output ColumnDesc to `leaves` and the column writers to `out`
1067    fn get_arrow_column_writer(
1068        &self,
1069        data_type: &ArrowDataType,
1070        props: &WriterPropertiesPtr,
1071        leaves: &mut Iter<'_, ColumnDescPtr>,
1072        out: &mut Vec<ArrowColumnWriter>,
1073    ) -> Result<()> {
1074        // Instantiate writers for normal columns
1075        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1076            let page_writer = self.create_page_writer(desc, out.len())?;
1077            let chunk = page_writer.buffer.clone();
1078            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1079            Ok(ArrowColumnWriter {
1080                chunk,
1081                writer: ArrowColumnWriterImpl::Column(writer),
1082            })
1083        };
1084
1085        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1086        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1087            let page_writer = self.create_page_writer(desc, out.len())?;
1088            let chunk = page_writer.buffer.clone();
1089            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1090            Ok(ArrowColumnWriter {
1091                chunk,
1092                writer: ArrowColumnWriterImpl::ByteArray(writer),
1093            })
1094        };
1095
1096        match data_type {
1097            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1098            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1099                out.push(col(leaves.next().unwrap())?)
1100            }
1101            ArrowDataType::LargeBinary
1102            | ArrowDataType::Binary
1103            | ArrowDataType::Utf8
1104            | ArrowDataType::LargeUtf8
1105            | ArrowDataType::BinaryView
1106            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1107            ArrowDataType::List(f)
1108            | ArrowDataType::LargeList(f)
1109            | ArrowDataType::FixedSizeList(f, _) => {
1110                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1111            }
1112            ArrowDataType::Struct(fields) => {
1113                for field in fields {
1114                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1115                }
1116            }
1117            ArrowDataType::Map(f, _) => match f.data_type() {
1118                ArrowDataType::Struct(f) => {
1119                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1120                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1121                }
1122                _ => unreachable!("invalid map type"),
1123            },
1124            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1125                ArrowDataType::Utf8
1126                | ArrowDataType::LargeUtf8
1127                | ArrowDataType::Binary
1128                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1129                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1130                    out.push(bytes(leaves.next().unwrap())?)
1131                }
1132                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1133                _ => out.push(col(leaves.next().unwrap())?),
1134            },
1135            _ => {
1136                return Err(ParquetError::NYI(format!(
1137                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1138                )));
1139            }
1140        }
1141        Ok(())
1142    }
1143}
1144
1145fn write_leaf(
1146    writer: &mut ColumnWriter<'_>,
1147    column: &dyn arrow_array::Array,
1148    levels: &ArrayLevels,
1149) -> Result<usize> {
1150    let indices = levels.non_null_indices();
1151
1152    match writer {
1153        // Note: this should match the contents of arrow_to_parquet_type
1154        ColumnWriter::Int32ColumnWriter(typed) => {
1155            match column.data_type() {
1156                ArrowDataType::Null => {
1157                    let array = Int32Array::new_null(column.len());
1158                    write_primitive(typed, array.values(), levels)
1159                }
1160                ArrowDataType::Int8 => {
1161                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1162                    write_primitive(typed, array.values(), levels)
1163                }
1164                ArrowDataType::Int16 => {
1165                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1166                    write_primitive(typed, array.values(), levels)
1167                }
1168                ArrowDataType::Int32 => {
1169                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1170                }
1171                ArrowDataType::UInt8 => {
1172                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1173                    write_primitive(typed, array.values(), levels)
1174                }
1175                ArrowDataType::UInt16 => {
1176                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1177                    write_primitive(typed, array.values(), levels)
1178                }
1179                ArrowDataType::UInt32 => {
1180                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1181                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1182                    let array = column.as_primitive::<UInt32Type>();
1183                    write_primitive(typed, array.values().inner().typed_data(), levels)
1184                }
1185                ArrowDataType::Date32 => {
1186                    let array = column.as_primitive::<Date32Type>();
1187                    write_primitive(typed, array.values(), levels)
1188                }
1189                ArrowDataType::Time32(TimeUnit::Second) => {
1190                    let array = column.as_primitive::<Time32SecondType>();
1191                    write_primitive(typed, array.values(), levels)
1192                }
1193                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1194                    let array = column.as_primitive::<Time32MillisecondType>();
1195                    write_primitive(typed, array.values(), levels)
1196                }
1197                ArrowDataType::Date64 => {
1198                    // If the column is a Date64, we truncate it
1199                    let array: Int32Array = column
1200                        .as_primitive::<Date64Type>()
1201                        .unary(|x| (x / 86_400_000) as _);
1202
1203                    write_primitive(typed, array.values(), levels)
1204                }
1205                ArrowDataType::Decimal32(_, _) => {
1206                    let array = column
1207                        .as_primitive::<Decimal32Type>()
1208                        .unary::<_, Int32Type>(|v| v);
1209                    write_primitive(typed, array.values(), levels)
1210                }
1211                ArrowDataType::Decimal64(_, _) => {
1212                    // use the int32 to represent the decimal with low precision
1213                    let array = column
1214                        .as_primitive::<Decimal64Type>()
1215                        .unary::<_, Int32Type>(|v| v as i32);
1216                    write_primitive(typed, array.values(), levels)
1217                }
1218                ArrowDataType::Decimal128(_, _) => {
1219                    // use the int32 to represent the decimal with low precision
1220                    let array = column
1221                        .as_primitive::<Decimal128Type>()
1222                        .unary::<_, Int32Type>(|v| v as i32);
1223                    write_primitive(typed, array.values(), levels)
1224                }
1225                ArrowDataType::Decimal256(_, _) => {
1226                    // use the int32 to represent the decimal with low precision
1227                    let array = column
1228                        .as_primitive::<Decimal256Type>()
1229                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1230                    write_primitive(typed, array.values(), levels)
1231                }
1232                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1233            }
1234        }
1235        ColumnWriter::BoolColumnWriter(typed) => {
1236            let array = column.as_boolean();
1237            typed.write_batch(
1238                get_bool_array_slice(array, indices).as_slice(),
1239                levels.def_levels(),
1240                levels.rep_levels(),
1241            )
1242        }
1243        ColumnWriter::Int64ColumnWriter(typed) => {
1244            match column.data_type() {
1245                ArrowDataType::Date64 => {
1246                    let array = column
1247                        .as_primitive::<Date64Type>()
1248                        .reinterpret_cast::<Int64Type>();
1249
1250                    write_primitive(typed, array.values(), levels)
1251                }
1252                ArrowDataType::Int64 => {
1253                    let array = column.as_primitive::<Int64Type>();
1254                    write_primitive(typed, array.values(), levels)
1255                }
1256                ArrowDataType::UInt64 => {
1257                    let values = column.as_primitive::<UInt64Type>().values();
1258                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1259                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1260                    let array = values.inner().typed_data::<i64>();
1261                    write_primitive(typed, array, levels)
1262                }
1263                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1264                    let array = column.as_primitive::<Time64MicrosecondType>();
1265                    write_primitive(typed, array.values(), levels)
1266                }
1267                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1268                    let array = column.as_primitive::<Time64NanosecondType>();
1269                    write_primitive(typed, array.values(), levels)
1270                }
1271                ArrowDataType::Timestamp(unit, _) => match unit {
1272                    TimeUnit::Second => {
1273                        let array = column.as_primitive::<TimestampSecondType>();
1274                        write_primitive(typed, array.values(), levels)
1275                    }
1276                    TimeUnit::Millisecond => {
1277                        let array = column.as_primitive::<TimestampMillisecondType>();
1278                        write_primitive(typed, array.values(), levels)
1279                    }
1280                    TimeUnit::Microsecond => {
1281                        let array = column.as_primitive::<TimestampMicrosecondType>();
1282                        write_primitive(typed, array.values(), levels)
1283                    }
1284                    TimeUnit::Nanosecond => {
1285                        let array = column.as_primitive::<TimestampNanosecondType>();
1286                        write_primitive(typed, array.values(), levels)
1287                    }
1288                },
1289                ArrowDataType::Duration(unit) => match unit {
1290                    TimeUnit::Second => {
1291                        let array = column.as_primitive::<DurationSecondType>();
1292                        write_primitive(typed, array.values(), levels)
1293                    }
1294                    TimeUnit::Millisecond => {
1295                        let array = column.as_primitive::<DurationMillisecondType>();
1296                        write_primitive(typed, array.values(), levels)
1297                    }
1298                    TimeUnit::Microsecond => {
1299                        let array = column.as_primitive::<DurationMicrosecondType>();
1300                        write_primitive(typed, array.values(), levels)
1301                    }
1302                    TimeUnit::Nanosecond => {
1303                        let array = column.as_primitive::<DurationNanosecondType>();
1304                        write_primitive(typed, array.values(), levels)
1305                    }
1306                },
1307                ArrowDataType::Decimal64(_, _) => {
1308                    let array = column
1309                        .as_primitive::<Decimal64Type>()
1310                        .reinterpret_cast::<Int64Type>();
1311                    write_primitive(typed, array.values(), levels)
1312                }
1313                ArrowDataType::Decimal128(_, _) => {
1314                    // use the int64 to represent the decimal with low precision
1315                    let array = column
1316                        .as_primitive::<Decimal128Type>()
1317                        .unary::<_, Int64Type>(|v| v as i64);
1318                    write_primitive(typed, array.values(), levels)
1319                }
1320                ArrowDataType::Decimal256(_, _) => {
1321                    // use the int64 to represent the decimal with low precision
1322                    let array = column
1323                        .as_primitive::<Decimal256Type>()
1324                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1325                    write_primitive(typed, array.values(), levels)
1326                }
1327                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1328            }
1329        }
1330        ColumnWriter::Int96ColumnWriter(_typed) => {
1331            unreachable!("Currently unreachable because data type not supported")
1332        }
1333        ColumnWriter::FloatColumnWriter(typed) => {
1334            let array = column.as_primitive::<Float32Type>();
1335            write_primitive(typed, array.values(), levels)
1336        }
1337        ColumnWriter::DoubleColumnWriter(typed) => {
1338            let array = column.as_primitive::<Float64Type>();
1339            write_primitive(typed, array.values(), levels)
1340        }
1341        ColumnWriter::ByteArrayColumnWriter(_) => {
1342            unreachable!("should use ByteArrayWriter")
1343        }
1344        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1345            let bytes = match column.data_type() {
1346                ArrowDataType::Interval(interval_unit) => match interval_unit {
1347                    IntervalUnit::YearMonth => {
1348                        let array = column.as_primitive::<IntervalYearMonthType>();
1349                        get_interval_ym_array_slice(array, indices)
1350                    }
1351                    IntervalUnit::DayTime => {
1352                        let array = column.as_primitive::<IntervalDayTimeType>();
1353                        get_interval_dt_array_slice(array, indices)
1354                    }
1355                    _ => {
1356                        return Err(ParquetError::NYI(format!(
1357                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1358                        )));
1359                    }
1360                },
1361                ArrowDataType::FixedSizeBinary(_) => {
1362                    let array = column.as_fixed_size_binary();
1363                    get_fsb_array_slice(array, indices)
1364                }
1365                ArrowDataType::Decimal32(_, _) => {
1366                    let array = column.as_primitive::<Decimal32Type>();
1367                    get_decimal_32_array_slice(array, indices)
1368                }
1369                ArrowDataType::Decimal64(_, _) => {
1370                    let array = column.as_primitive::<Decimal64Type>();
1371                    get_decimal_64_array_slice(array, indices)
1372                }
1373                ArrowDataType::Decimal128(_, _) => {
1374                    let array = column.as_primitive::<Decimal128Type>();
1375                    get_decimal_128_array_slice(array, indices)
1376                }
1377                ArrowDataType::Decimal256(_, _) => {
1378                    let array = column.as_primitive::<Decimal256Type>();
1379                    get_decimal_256_array_slice(array, indices)
1380                }
1381                ArrowDataType::Float16 => {
1382                    let array = column.as_primitive::<Float16Type>();
1383                    get_float_16_array_slice(array, indices)
1384                }
1385                _ => {
1386                    return Err(ParquetError::NYI(
1387                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1388                    ));
1389                }
1390            };
1391            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1392        }
1393    }
1394}
1395
1396fn write_primitive<E: ColumnValueEncoder>(
1397    writer: &mut GenericColumnWriter<E>,
1398    values: &E::Values,
1399    levels: &ArrayLevels,
1400) -> Result<usize> {
1401    writer.write_batch_internal(
1402        values,
1403        Some(levels.non_null_indices()),
1404        levels.def_levels(),
1405        levels.rep_levels(),
1406        None,
1407        None,
1408        None,
1409    )
1410}
1411
1412fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1413    let mut values = Vec::with_capacity(indices.len());
1414    for i in indices {
1415        values.push(array.value(*i))
1416    }
1417    values
1418}
1419
1420/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1421/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1422fn get_interval_ym_array_slice(
1423    array: &arrow_array::IntervalYearMonthArray,
1424    indices: &[usize],
1425) -> Vec<FixedLenByteArray> {
1426    let mut values = Vec::with_capacity(indices.len());
1427    for i in indices {
1428        let mut value = array.value(*i).to_le_bytes().to_vec();
1429        let mut suffix = vec![0; 8];
1430        value.append(&mut suffix);
1431        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1432    }
1433    values
1434}
1435
1436/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1437/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1438fn get_interval_dt_array_slice(
1439    array: &arrow_array::IntervalDayTimeArray,
1440    indices: &[usize],
1441) -> Vec<FixedLenByteArray> {
1442    let mut values = Vec::with_capacity(indices.len());
1443    for i in indices {
1444        let mut out = [0; 12];
1445        let value = array.value(*i);
1446        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1447        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1448        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1449    }
1450    values
1451}
1452
1453fn get_decimal_32_array_slice(
1454    array: &arrow_array::Decimal32Array,
1455    indices: &[usize],
1456) -> Vec<FixedLenByteArray> {
1457    let mut values = Vec::with_capacity(indices.len());
1458    let size = decimal_length_from_precision(array.precision());
1459    for i in indices {
1460        let as_be_bytes = array.value(*i).to_be_bytes();
1461        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1462        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1463    }
1464    values
1465}
1466
1467fn get_decimal_64_array_slice(
1468    array: &arrow_array::Decimal64Array,
1469    indices: &[usize],
1470) -> Vec<FixedLenByteArray> {
1471    let mut values = Vec::with_capacity(indices.len());
1472    let size = decimal_length_from_precision(array.precision());
1473    for i in indices {
1474        let as_be_bytes = array.value(*i).to_be_bytes();
1475        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1476        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1477    }
1478    values
1479}
1480
1481fn get_decimal_128_array_slice(
1482    array: &arrow_array::Decimal128Array,
1483    indices: &[usize],
1484) -> Vec<FixedLenByteArray> {
1485    let mut values = Vec::with_capacity(indices.len());
1486    let size = decimal_length_from_precision(array.precision());
1487    for i in indices {
1488        let as_be_bytes = array.value(*i).to_be_bytes();
1489        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1490        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1491    }
1492    values
1493}
1494
1495fn get_decimal_256_array_slice(
1496    array: &arrow_array::Decimal256Array,
1497    indices: &[usize],
1498) -> Vec<FixedLenByteArray> {
1499    let mut values = Vec::with_capacity(indices.len());
1500    let size = decimal_length_from_precision(array.precision());
1501    for i in indices {
1502        let as_be_bytes = array.value(*i).to_be_bytes();
1503        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1504        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1505    }
1506    values
1507}
1508
1509fn get_float_16_array_slice(
1510    array: &arrow_array::Float16Array,
1511    indices: &[usize],
1512) -> Vec<FixedLenByteArray> {
1513    let mut values = Vec::with_capacity(indices.len());
1514    for i in indices {
1515        let value = array.value(*i).to_le_bytes().to_vec();
1516        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1517    }
1518    values
1519}
1520
1521fn get_fsb_array_slice(
1522    array: &arrow_array::FixedSizeBinaryArray,
1523    indices: &[usize],
1524) -> Vec<FixedLenByteArray> {
1525    let mut values = Vec::with_capacity(indices.len());
1526    for i in indices {
1527        let value = array.value(*i).to_vec();
1528        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1529    }
1530    values
1531}
1532
1533#[cfg(test)]
1534mod tests {
1535    use super::*;
1536    use std::collections::HashMap;
1537
1538    use std::fs::File;
1539
1540    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1541    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1542    use crate::column::page::{Page, PageReader};
1543    use crate::file::metadata::thrift::PageHeader;
1544    use crate::file::page_index::column_index::ColumnIndexMetaData;
1545    use crate::file::reader::SerializedPageReader;
1546    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1547    use crate::schema::types::ColumnPath;
1548    use arrow::datatypes::ToByteSlice;
1549    use arrow::datatypes::{DataType, Schema};
1550    use arrow::error::Result as ArrowResult;
1551    use arrow::util::data_gen::create_random_array;
1552    use arrow::util::pretty::pretty_format_batches;
1553    use arrow::{array::*, buffer::Buffer};
1554    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1555    use arrow_schema::Fields;
1556    use half::f16;
1557    use num_traits::{FromPrimitive, ToPrimitive};
1558    use tempfile::tempfile;
1559
1560    use crate::basic::Encoding;
1561    use crate::data_type::AsBytes;
1562    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1563    use crate::file::properties::{
1564        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1565    };
1566    use crate::file::serialized_reader::ReadOptionsBuilder;
1567    use crate::file::{
1568        reader::{FileReader, SerializedFileReader},
1569        statistics::Statistics,
1570    };
1571
1572    #[test]
1573    fn arrow_writer() {
1574        // define schema
1575        let schema = Schema::new(vec![
1576            Field::new("a", DataType::Int32, false),
1577            Field::new("b", DataType::Int32, true),
1578        ]);
1579
1580        // create some data
1581        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1582        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1583
1584        // build a record batch
1585        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1586
1587        roundtrip(batch, Some(SMALL_SIZE / 2));
1588    }
1589
1590    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1591        let mut buffer = vec![];
1592
1593        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1594        writer.write(expected_batch).unwrap();
1595        writer.close().unwrap();
1596
1597        buffer
1598    }
1599
1600    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1601        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1602        writer.write(expected_batch).unwrap();
1603        writer.into_inner().unwrap()
1604    }
1605
1606    #[test]
1607    fn roundtrip_bytes() {
1608        // define schema
1609        let schema = Arc::new(Schema::new(vec![
1610            Field::new("a", DataType::Int32, false),
1611            Field::new("b", DataType::Int32, true),
1612        ]));
1613
1614        // create some data
1615        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1616        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1617
1618        // build a record batch
1619        let expected_batch =
1620            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1621
1622        for buffer in [
1623            get_bytes_after_close(schema.clone(), &expected_batch),
1624            get_bytes_by_into_inner(schema, &expected_batch),
1625        ] {
1626            let cursor = Bytes::from(buffer);
1627            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1628
1629            let actual_batch = record_batch_reader
1630                .next()
1631                .expect("No batch found")
1632                .expect("Unable to get batch");
1633
1634            assert_eq!(expected_batch.schema(), actual_batch.schema());
1635            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1636            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1637            for i in 0..expected_batch.num_columns() {
1638                let expected_data = expected_batch.column(i).to_data();
1639                let actual_data = actual_batch.column(i).to_data();
1640
1641                assert_eq!(expected_data, actual_data);
1642            }
1643        }
1644    }
1645
1646    #[test]
1647    fn arrow_writer_non_null() {
1648        // define schema
1649        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1650
1651        // create some data
1652        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1653
1654        // build a record batch
1655        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1656
1657        roundtrip(batch, Some(SMALL_SIZE / 2));
1658    }
1659
1660    #[test]
1661    fn arrow_writer_list() {
1662        // define schema
1663        let schema = Schema::new(vec![Field::new(
1664            "a",
1665            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1666            true,
1667        )]);
1668
1669        // create some data
1670        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1671
1672        // Construct a buffer for value offsets, for the nested array:
1673        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1674        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1675
1676        // Construct a list array from the above two
1677        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1678            DataType::Int32,
1679            false,
1680        ))))
1681        .len(5)
1682        .add_buffer(a_value_offsets)
1683        .add_child_data(a_values.into_data())
1684        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1685        .build()
1686        .unwrap();
1687        let a = ListArray::from(a_list_data);
1688
1689        // build a record batch
1690        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1691
1692        assert_eq!(batch.column(0).null_count(), 1);
1693
1694        // This test fails if the max row group size is less than the batch's length
1695        // see https://github.com/apache/arrow-rs/issues/518
1696        roundtrip(batch, None);
1697    }
1698
1699    #[test]
1700    fn arrow_writer_list_non_null() {
1701        // define schema
1702        let schema = Schema::new(vec![Field::new(
1703            "a",
1704            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1705            false,
1706        )]);
1707
1708        // create some data
1709        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1710
1711        // Construct a buffer for value offsets, for the nested array:
1712        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1713        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1714
1715        // Construct a list array from the above two
1716        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1717            DataType::Int32,
1718            false,
1719        ))))
1720        .len(5)
1721        .add_buffer(a_value_offsets)
1722        .add_child_data(a_values.into_data())
1723        .build()
1724        .unwrap();
1725        let a = ListArray::from(a_list_data);
1726
1727        // build a record batch
1728        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1729
1730        // This test fails if the max row group size is less than the batch's length
1731        // see https://github.com/apache/arrow-rs/issues/518
1732        assert_eq!(batch.column(0).null_count(), 0);
1733
1734        roundtrip(batch, None);
1735    }
1736
1737    #[test]
1738    fn arrow_writer_binary() {
1739        let string_field = Field::new("a", DataType::Utf8, false);
1740        let binary_field = Field::new("b", DataType::Binary, false);
1741        let schema = Schema::new(vec![string_field, binary_field]);
1742
1743        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1744        let raw_binary_values = [
1745            b"foo".to_vec(),
1746            b"bar".to_vec(),
1747            b"baz".to_vec(),
1748            b"quux".to_vec(),
1749        ];
1750        let raw_binary_value_refs = raw_binary_values
1751            .iter()
1752            .map(|x| x.as_slice())
1753            .collect::<Vec<_>>();
1754
1755        let string_values = StringArray::from(raw_string_values.clone());
1756        let binary_values = BinaryArray::from(raw_binary_value_refs);
1757        let batch = RecordBatch::try_new(
1758            Arc::new(schema),
1759            vec![Arc::new(string_values), Arc::new(binary_values)],
1760        )
1761        .unwrap();
1762
1763        roundtrip(batch, Some(SMALL_SIZE / 2));
1764    }
1765
1766    #[test]
1767    fn arrow_writer_binary_view() {
1768        let string_field = Field::new("a", DataType::Utf8View, false);
1769        let binary_field = Field::new("b", DataType::BinaryView, false);
1770        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1771        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1772
1773        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1774        let raw_binary_values = vec![
1775            b"foo".to_vec(),
1776            b"bar".to_vec(),
1777            b"large payload over 12 bytes".to_vec(),
1778            b"lulu".to_vec(),
1779        ];
1780        let nullable_string_values =
1781            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1782
1783        let string_view_values = StringViewArray::from(raw_string_values);
1784        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1785        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1786        let batch = RecordBatch::try_new(
1787            Arc::new(schema),
1788            vec![
1789                Arc::new(string_view_values),
1790                Arc::new(binary_view_values),
1791                Arc::new(nullable_string_view_values),
1792            ],
1793        )
1794        .unwrap();
1795
1796        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1797        roundtrip(batch, None);
1798    }
1799
1800    #[test]
1801    fn arrow_writer_binary_view_long_value() {
1802        let string_field = Field::new("a", DataType::Utf8View, false);
1803        let binary_field = Field::new("b", DataType::BinaryView, false);
1804        let schema = Schema::new(vec![string_field, binary_field]);
1805
1806        // There is special case validation for long values (greater than 128)
1807        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
1808        // trigger the long-string UTF-8 validation branch in the plain decoder.
1809        let long = "a".repeat(128);
1810        let raw_string_values = vec!["foo", long.as_str(), "bar"];
1811        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
1812
1813        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
1814        let binary_view_values: ArrayRef =
1815            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
1816
1817        one_column_roundtrip(Arc::clone(&string_view_values), false);
1818        one_column_roundtrip(Arc::clone(&binary_view_values), false);
1819
1820        let batch = RecordBatch::try_new(
1821            Arc::new(schema),
1822            vec![string_view_values, binary_view_values],
1823        )
1824        .unwrap();
1825
1826        // Disable dictionary to exercise plain encoding paths in the reader.
1827        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1828            let props = WriterProperties::builder()
1829                .set_writer_version(version)
1830                .set_dictionary_enabled(false)
1831                .build();
1832            roundtrip_opts(&batch, props);
1833        }
1834    }
1835
1836    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1837        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1838        let schema = Schema::new(vec![decimal_field]);
1839
1840        let decimal_values = vec![10_000, 50_000, 0, -100]
1841            .into_iter()
1842            .map(Some)
1843            .collect::<Decimal128Array>()
1844            .with_precision_and_scale(precision, scale)
1845            .unwrap();
1846
1847        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1848    }
1849
1850    #[test]
1851    fn arrow_writer_decimal() {
1852        // int32 to store the decimal value
1853        let batch_int32_decimal = get_decimal_batch(5, 2);
1854        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1855        // int64 to store the decimal value
1856        let batch_int64_decimal = get_decimal_batch(12, 2);
1857        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1858        // fixed_length_byte_array to store the decimal value
1859        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1860        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1861    }
1862
1863    #[test]
1864    fn arrow_writer_complex() {
1865        // define schema
1866        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1867        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1868        let struct_field_g = Arc::new(Field::new_list(
1869            "g",
1870            Field::new_list_field(DataType::Int16, true),
1871            false,
1872        ));
1873        let struct_field_h = Arc::new(Field::new_list(
1874            "h",
1875            Field::new_list_field(DataType::Int16, false),
1876            true,
1877        ));
1878        let struct_field_e = Arc::new(Field::new_struct(
1879            "e",
1880            vec![
1881                struct_field_f.clone(),
1882                struct_field_g.clone(),
1883                struct_field_h.clone(),
1884            ],
1885            false,
1886        ));
1887        let schema = Schema::new(vec![
1888            Field::new("a", DataType::Int32, false),
1889            Field::new("b", DataType::Int32, true),
1890            Field::new_struct(
1891                "c",
1892                vec![struct_field_d.clone(), struct_field_e.clone()],
1893                false,
1894            ),
1895        ]);
1896
1897        // create some data
1898        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1899        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1900        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1901        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1902
1903        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1904
1905        // Construct a buffer for value offsets, for the nested array:
1906        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1907        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1908
1909        // Construct a list array from the above two
1910        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1911            .len(5)
1912            .add_buffer(g_value_offsets.clone())
1913            .add_child_data(g_value.to_data())
1914            .build()
1915            .unwrap();
1916        let g = ListArray::from(g_list_data);
1917        // The difference between g and h is that h has a null bitmap
1918        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1919            .len(5)
1920            .add_buffer(g_value_offsets)
1921            .add_child_data(g_value.to_data())
1922            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1923            .build()
1924            .unwrap();
1925        let h = ListArray::from(h_list_data);
1926
1927        let e = StructArray::from(vec![
1928            (struct_field_f, Arc::new(f) as ArrayRef),
1929            (struct_field_g, Arc::new(g) as ArrayRef),
1930            (struct_field_h, Arc::new(h) as ArrayRef),
1931        ]);
1932
1933        let c = StructArray::from(vec![
1934            (struct_field_d, Arc::new(d) as ArrayRef),
1935            (struct_field_e, Arc::new(e) as ArrayRef),
1936        ]);
1937
1938        // build a record batch
1939        let batch = RecordBatch::try_new(
1940            Arc::new(schema),
1941            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1942        )
1943        .unwrap();
1944
1945        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1946        roundtrip(batch, Some(SMALL_SIZE / 3));
1947    }
1948
1949    #[test]
1950    fn arrow_writer_complex_mixed() {
1951        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1952        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1953
1954        // define schema
1955        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1956        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1957        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1958        let schema = Schema::new(vec![Field::new(
1959            "some_nested_object",
1960            DataType::Struct(Fields::from(vec![
1961                offset_field.clone(),
1962                partition_field.clone(),
1963                topic_field.clone(),
1964            ])),
1965            false,
1966        )]);
1967
1968        // create some data
1969        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1970        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1971        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1972
1973        let some_nested_object = StructArray::from(vec![
1974            (offset_field, Arc::new(offset) as ArrayRef),
1975            (partition_field, Arc::new(partition) as ArrayRef),
1976            (topic_field, Arc::new(topic) as ArrayRef),
1977        ]);
1978
1979        // build a record batch
1980        let batch =
1981            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1982
1983        roundtrip(batch, Some(SMALL_SIZE / 2));
1984    }
1985
1986    #[test]
1987    fn arrow_writer_map() {
1988        // Note: we are using the JSON Arrow reader for brevity
1989        let json_content = r#"
1990        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1991        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1992        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1993        "#;
1994        let entries_struct_type = DataType::Struct(Fields::from(vec![
1995            Field::new("key", DataType::Utf8, false),
1996            Field::new("value", DataType::Utf8, true),
1997        ]));
1998        let stocks_field = Field::new(
1999            "stocks",
2000            DataType::Map(
2001                Arc::new(Field::new("entries", entries_struct_type, false)),
2002                false,
2003            ),
2004            true,
2005        );
2006        let schema = Arc::new(Schema::new(vec![stocks_field]));
2007        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2008        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2009
2010        let batch = reader.next().unwrap().unwrap();
2011        roundtrip(batch, None);
2012    }
2013
2014    #[test]
2015    fn arrow_writer_2_level_struct() {
2016        // tests writing <struct<struct<primitive>>
2017        let field_c = Field::new("c", DataType::Int32, true);
2018        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2019        let type_a = DataType::Struct(vec![field_b.clone()].into());
2020        let field_a = Field::new("a", type_a, true);
2021        let schema = Schema::new(vec![field_a.clone()]);
2022
2023        // create data
2024        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2025        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2026            .len(6)
2027            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2028            .add_child_data(c.into_data())
2029            .build()
2030            .unwrap();
2031        let b = StructArray::from(b_data);
2032        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2033            .len(6)
2034            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2035            .add_child_data(b.into_data())
2036            .build()
2037            .unwrap();
2038        let a = StructArray::from(a_data);
2039
2040        assert_eq!(a.null_count(), 1);
2041        assert_eq!(a.column(0).null_count(), 2);
2042
2043        // build a racord batch
2044        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2045
2046        roundtrip(batch, Some(SMALL_SIZE / 2));
2047    }
2048
2049    #[test]
2050    fn arrow_writer_2_level_struct_non_null() {
2051        // tests writing <struct<struct<primitive>>
2052        let field_c = Field::new("c", DataType::Int32, false);
2053        let type_b = DataType::Struct(vec![field_c].into());
2054        let field_b = Field::new("b", type_b.clone(), false);
2055        let type_a = DataType::Struct(vec![field_b].into());
2056        let field_a = Field::new("a", type_a.clone(), false);
2057        let schema = Schema::new(vec![field_a]);
2058
2059        // create data
2060        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2061        let b_data = ArrayDataBuilder::new(type_b)
2062            .len(6)
2063            .add_child_data(c.into_data())
2064            .build()
2065            .unwrap();
2066        let b = StructArray::from(b_data);
2067        let a_data = ArrayDataBuilder::new(type_a)
2068            .len(6)
2069            .add_child_data(b.into_data())
2070            .build()
2071            .unwrap();
2072        let a = StructArray::from(a_data);
2073
2074        assert_eq!(a.null_count(), 0);
2075        assert_eq!(a.column(0).null_count(), 0);
2076
2077        // build a racord batch
2078        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2079
2080        roundtrip(batch, Some(SMALL_SIZE / 2));
2081    }
2082
2083    #[test]
2084    fn arrow_writer_2_level_struct_mixed_null() {
2085        // tests writing <struct<struct<primitive>>
2086        let field_c = Field::new("c", DataType::Int32, false);
2087        let type_b = DataType::Struct(vec![field_c].into());
2088        let field_b = Field::new("b", type_b.clone(), true);
2089        let type_a = DataType::Struct(vec![field_b].into());
2090        let field_a = Field::new("a", type_a.clone(), false);
2091        let schema = Schema::new(vec![field_a]);
2092
2093        // create data
2094        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2095        let b_data = ArrayDataBuilder::new(type_b)
2096            .len(6)
2097            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2098            .add_child_data(c.into_data())
2099            .build()
2100            .unwrap();
2101        let b = StructArray::from(b_data);
2102        // a intentionally has no null buffer, to test that this is handled correctly
2103        let a_data = ArrayDataBuilder::new(type_a)
2104            .len(6)
2105            .add_child_data(b.into_data())
2106            .build()
2107            .unwrap();
2108        let a = StructArray::from(a_data);
2109
2110        assert_eq!(a.null_count(), 0);
2111        assert_eq!(a.column(0).null_count(), 2);
2112
2113        // build a racord batch
2114        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2115
2116        roundtrip(batch, Some(SMALL_SIZE / 2));
2117    }
2118
2119    #[test]
2120    fn arrow_writer_2_level_struct_mixed_null_2() {
2121        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2122        let field_c = Field::new("c", DataType::Int32, false);
2123        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2124        let field_e = Field::new(
2125            "e",
2126            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2127            false,
2128        );
2129
2130        let field_b = Field::new(
2131            "b",
2132            DataType::Struct(vec![field_c, field_d, field_e].into()),
2133            false,
2134        );
2135        let type_a = DataType::Struct(vec![field_b.clone()].into());
2136        let field_a = Field::new("a", type_a, true);
2137        let schema = Schema::new(vec![field_a.clone()]);
2138
2139        // create data
2140        let c = Int32Array::from_iter_values(0..6);
2141        let d = FixedSizeBinaryArray::try_from_iter(
2142            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2143        )
2144        .expect("four byte values");
2145        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2146        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2147            .len(6)
2148            .add_child_data(c.into_data())
2149            .add_child_data(d.into_data())
2150            .add_child_data(e.into_data())
2151            .build()
2152            .unwrap();
2153        let b = StructArray::from(b_data);
2154        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2155            .len(6)
2156            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2157            .add_child_data(b.into_data())
2158            .build()
2159            .unwrap();
2160        let a = StructArray::from(a_data);
2161
2162        assert_eq!(a.null_count(), 3);
2163        assert_eq!(a.column(0).null_count(), 0);
2164
2165        // build a record batch
2166        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2167
2168        roundtrip(batch, Some(SMALL_SIZE / 2));
2169    }
2170
2171    #[test]
2172    fn test_fixed_size_binary_in_dict() {
2173        fn test_fixed_size_binary_in_dict_inner<K>()
2174        where
2175            K: ArrowDictionaryKeyType,
2176            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2177            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2178        {
2179            let field = Field::new(
2180                "a",
2181                DataType::Dictionary(
2182                    Box::new(K::DATA_TYPE),
2183                    Box::new(DataType::FixedSizeBinary(4)),
2184                ),
2185                false,
2186            );
2187            let schema = Schema::new(vec![field]);
2188
2189            let keys: Vec<K::Native> = vec![
2190                K::Native::try_from(0u8).unwrap(),
2191                K::Native::try_from(0u8).unwrap(),
2192                K::Native::try_from(1u8).unwrap(),
2193            ];
2194            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2195            let values = FixedSizeBinaryArray::try_from_iter(
2196                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2197            )
2198            .unwrap();
2199
2200            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2201            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2202            roundtrip(batch, None);
2203        }
2204
2205        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2206        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2207        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2208        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2209        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2210        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2211        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2212        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2213    }
2214
2215    #[test]
2216    fn test_empty_dict() {
2217        let struct_fields = Fields::from(vec![Field::new(
2218            "dict",
2219            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2220            false,
2221        )]);
2222
2223        let schema = Schema::new(vec![Field::new_struct(
2224            "struct",
2225            struct_fields.clone(),
2226            true,
2227        )]);
2228        let dictionary = Arc::new(DictionaryArray::new(
2229            Int32Array::new_null(5),
2230            Arc::new(StringArray::new_null(0)),
2231        ));
2232
2233        let s = StructArray::new(
2234            struct_fields,
2235            vec![dictionary],
2236            Some(NullBuffer::new_null(5)),
2237        );
2238
2239        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2240        roundtrip(batch, None);
2241    }
2242    #[test]
2243    fn arrow_writer_page_size() {
2244        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2245
2246        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2247
2248        // Generate an array of 10 unique 10 character string
2249        for i in 0..10 {
2250            let value = i
2251                .to_string()
2252                .repeat(10)
2253                .chars()
2254                .take(10)
2255                .collect::<String>();
2256
2257            builder.append_value(value);
2258        }
2259
2260        let array = Arc::new(builder.finish());
2261
2262        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2263
2264        let file = tempfile::tempfile().unwrap();
2265
2266        // Set everything very low so we fallback to PLAIN encoding after the first row
2267        let props = WriterProperties::builder()
2268            .set_data_page_size_limit(1)
2269            .set_dictionary_page_size_limit(1)
2270            .set_write_batch_size(1)
2271            .build();
2272
2273        let mut writer =
2274            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2275                .expect("Unable to write file");
2276        writer.write(&batch).unwrap();
2277        writer.close().unwrap();
2278
2279        let options = ReadOptionsBuilder::new().with_page_index().build();
2280        let reader =
2281            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2282
2283        let column = reader.metadata().row_group(0).columns();
2284
2285        assert_eq!(column.len(), 1);
2286
2287        // We should write one row before falling back to PLAIN encoding so there should still be a
2288        // dictionary page.
2289        assert!(
2290            column[0].dictionary_page_offset().is_some(),
2291            "Expected a dictionary page"
2292        );
2293
2294        assert!(reader.metadata().offset_index().is_some());
2295        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2296
2297        let page_locations = offset_indexes[0].page_locations.clone();
2298
2299        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2300        // so we expect one dictionary encoded page and then a page per row thereafter.
2301        assert_eq!(
2302            page_locations.len(),
2303            10,
2304            "Expected 10 pages but got {page_locations:#?}"
2305        );
2306    }
2307
2308    #[test]
2309    fn arrow_writer_float_nans() {
2310        let f16_field = Field::new("a", DataType::Float16, false);
2311        let f32_field = Field::new("b", DataType::Float32, false);
2312        let f64_field = Field::new("c", DataType::Float64, false);
2313        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2314
2315        let f16_values = (0..MEDIUM_SIZE)
2316            .map(|i| {
2317                Some(if i % 2 == 0 {
2318                    f16::NAN
2319                } else {
2320                    f16::from_f32(i as f32)
2321                })
2322            })
2323            .collect::<Float16Array>();
2324
2325        let f32_values = (0..MEDIUM_SIZE)
2326            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2327            .collect::<Float32Array>();
2328
2329        let f64_values = (0..MEDIUM_SIZE)
2330            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2331            .collect::<Float64Array>();
2332
2333        let batch = RecordBatch::try_new(
2334            Arc::new(schema),
2335            vec![
2336                Arc::new(f16_values),
2337                Arc::new(f32_values),
2338                Arc::new(f64_values),
2339            ],
2340        )
2341        .unwrap();
2342
2343        roundtrip(batch, None);
2344    }
2345
2346    const SMALL_SIZE: usize = 7;
2347    const MEDIUM_SIZE: usize = 63;
2348
2349    // Write the batch to parquet and read it back out, ensuring
2350    // that what comes out is the same as what was written in
2351    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2352        let mut files = vec![];
2353        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2354            let mut props = WriterProperties::builder().set_writer_version(version);
2355
2356            if let Some(size) = max_row_group_size {
2357                props = props.set_max_row_group_size(size)
2358            }
2359
2360            let props = props.build();
2361            files.push(roundtrip_opts(&expected_batch, props))
2362        }
2363        files
2364    }
2365
2366    // Round trip the specified record batch with the specified writer properties,
2367    // to an in-memory file, and validate the arrays using the specified function.
2368    // Returns the in-memory file.
2369    fn roundtrip_opts_with_array_validation<F>(
2370        expected_batch: &RecordBatch,
2371        props: WriterProperties,
2372        validate: F,
2373    ) -> Bytes
2374    where
2375        F: Fn(&ArrayData, &ArrayData),
2376    {
2377        let mut file = vec![];
2378
2379        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2380            .expect("Unable to write file");
2381        writer.write(expected_batch).unwrap();
2382        writer.close().unwrap();
2383
2384        let file = Bytes::from(file);
2385        let mut record_batch_reader =
2386            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2387
2388        let actual_batch = record_batch_reader
2389            .next()
2390            .expect("No batch found")
2391            .expect("Unable to get batch");
2392
2393        assert_eq!(expected_batch.schema(), actual_batch.schema());
2394        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2395        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2396        for i in 0..expected_batch.num_columns() {
2397            let expected_data = expected_batch.column(i).to_data();
2398            let actual_data = actual_batch.column(i).to_data();
2399            validate(&expected_data, &actual_data);
2400        }
2401
2402        file
2403    }
2404
2405    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2406        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2407            a.validate_full().expect("valid expected data");
2408            b.validate_full().expect("valid actual data");
2409            assert_eq!(a, b)
2410        })
2411    }
2412
2413    struct RoundTripOptions {
2414        values: ArrayRef,
2415        schema: SchemaRef,
2416        bloom_filter: bool,
2417        bloom_filter_position: BloomFilterPosition,
2418    }
2419
2420    impl RoundTripOptions {
2421        fn new(values: ArrayRef, nullable: bool) -> Self {
2422            let data_type = values.data_type().clone();
2423            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2424            Self {
2425                values,
2426                schema: Arc::new(schema),
2427                bloom_filter: false,
2428                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2429            }
2430        }
2431    }
2432
2433    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2434        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2435    }
2436
2437    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2438        let mut options = RoundTripOptions::new(values, false);
2439        options.schema = schema;
2440        one_column_roundtrip_with_options(options)
2441    }
2442
2443    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2444        let RoundTripOptions {
2445            values,
2446            schema,
2447            bloom_filter,
2448            bloom_filter_position,
2449        } = options;
2450
2451        let encodings = match values.data_type() {
2452            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2453                vec![
2454                    Encoding::PLAIN,
2455                    Encoding::DELTA_BYTE_ARRAY,
2456                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2457                ]
2458            }
2459            DataType::Int64
2460            | DataType::Int32
2461            | DataType::Int16
2462            | DataType::Int8
2463            | DataType::UInt64
2464            | DataType::UInt32
2465            | DataType::UInt16
2466            | DataType::UInt8 => vec![
2467                Encoding::PLAIN,
2468                Encoding::DELTA_BINARY_PACKED,
2469                Encoding::BYTE_STREAM_SPLIT,
2470            ],
2471            DataType::Float32 | DataType::Float64 => {
2472                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2473            }
2474            _ => vec![Encoding::PLAIN],
2475        };
2476
2477        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2478
2479        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2480
2481        let mut files = vec![];
2482        for dictionary_size in [0, 1, 1024] {
2483            for encoding in &encodings {
2484                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2485                    for row_group_size in row_group_sizes {
2486                        let props = WriterProperties::builder()
2487                            .set_writer_version(version)
2488                            .set_max_row_group_size(row_group_size)
2489                            .set_dictionary_enabled(dictionary_size != 0)
2490                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2491                            .set_encoding(*encoding)
2492                            .set_bloom_filter_enabled(bloom_filter)
2493                            .set_bloom_filter_position(bloom_filter_position)
2494                            .build();
2495
2496                        files.push(roundtrip_opts(&expected_batch, props))
2497                    }
2498                }
2499            }
2500        }
2501        files
2502    }
2503
2504    fn values_required<A, I>(iter: I) -> Vec<Bytes>
2505    where
2506        A: From<Vec<I::Item>> + Array + 'static,
2507        I: IntoIterator,
2508    {
2509        let raw_values: Vec<_> = iter.into_iter().collect();
2510        let values = Arc::new(A::from(raw_values));
2511        one_column_roundtrip(values, false)
2512    }
2513
2514    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2515    where
2516        A: From<Vec<Option<I::Item>>> + Array + 'static,
2517        I: IntoIterator,
2518    {
2519        let optional_raw_values: Vec<_> = iter
2520            .into_iter()
2521            .enumerate()
2522            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2523            .collect();
2524        let optional_values = Arc::new(A::from(optional_raw_values));
2525        one_column_roundtrip(optional_values, true)
2526    }
2527
2528    fn required_and_optional<A, I>(iter: I)
2529    where
2530        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2531        I: IntoIterator + Clone,
2532    {
2533        values_required::<A, I>(iter.clone());
2534        values_optional::<A, I>(iter);
2535    }
2536
2537    fn check_bloom_filter<T: AsBytes>(
2538        files: Vec<Bytes>,
2539        file_column: String,
2540        positive_values: Vec<T>,
2541        negative_values: Vec<T>,
2542    ) {
2543        files.into_iter().take(1).for_each(|file| {
2544            let file_reader = SerializedFileReader::new_with_options(
2545                file,
2546                ReadOptionsBuilder::new()
2547                    .with_reader_properties(
2548                        ReaderProperties::builder()
2549                            .set_read_bloom_filter(true)
2550                            .build(),
2551                    )
2552                    .build(),
2553            )
2554            .expect("Unable to open file as Parquet");
2555            let metadata = file_reader.metadata();
2556
2557            // Gets bloom filters from all row groups.
2558            let mut bloom_filters: Vec<_> = vec![];
2559            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2560                if let Some((column_index, _)) = row_group
2561                    .columns()
2562                    .iter()
2563                    .enumerate()
2564                    .find(|(_, column)| column.column_path().string() == file_column)
2565                {
2566                    let row_group_reader = file_reader
2567                        .get_row_group(ri)
2568                        .expect("Unable to read row group");
2569                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2570                        bloom_filters.push(sbbf.clone());
2571                    } else {
2572                        panic!("No bloom filter for column named {file_column} found");
2573                    }
2574                } else {
2575                    panic!("No column named {file_column} found");
2576                }
2577            }
2578
2579            positive_values.iter().for_each(|value| {
2580                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2581                assert!(
2582                    found.is_some(),
2583                    "{}",
2584                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2585                );
2586            });
2587
2588            negative_values.iter().for_each(|value| {
2589                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2590                assert!(
2591                    found.is_none(),
2592                    "{}",
2593                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2594                );
2595            });
2596        });
2597    }
2598
2599    #[test]
2600    fn all_null_primitive_single_column() {
2601        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2602        one_column_roundtrip(values, true);
2603    }
2604    #[test]
2605    fn null_single_column() {
2606        let values = Arc::new(NullArray::new(SMALL_SIZE));
2607        one_column_roundtrip(values, true);
2608        // null arrays are always nullable, a test with non-nullable nulls fails
2609    }
2610
2611    #[test]
2612    fn bool_single_column() {
2613        required_and_optional::<BooleanArray, _>(
2614            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2615        );
2616    }
2617
2618    #[test]
2619    fn bool_large_single_column() {
2620        let values = Arc::new(
2621            [None, Some(true), Some(false)]
2622                .iter()
2623                .cycle()
2624                .copied()
2625                .take(200_000)
2626                .collect::<BooleanArray>(),
2627        );
2628        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2629        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2630        let file = tempfile::tempfile().unwrap();
2631
2632        let mut writer =
2633            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2634                .expect("Unable to write file");
2635        writer.write(&expected_batch).unwrap();
2636        writer.close().unwrap();
2637    }
2638
2639    #[test]
2640    fn check_page_offset_index_with_nan() {
2641        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2642        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2643        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2644
2645        let mut out = Vec::with_capacity(1024);
2646        let mut writer =
2647            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2648        writer.write(&batch).unwrap();
2649        let file_meta_data = writer.close().unwrap();
2650        for row_group in file_meta_data.row_groups() {
2651            for column in row_group.columns() {
2652                assert!(column.offset_index_offset().is_some());
2653                assert!(column.offset_index_length().is_some());
2654                assert!(column.column_index_offset().is_none());
2655                assert!(column.column_index_length().is_none());
2656            }
2657        }
2658    }
2659
2660    #[test]
2661    fn i8_single_column() {
2662        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2663    }
2664
2665    #[test]
2666    fn i16_single_column() {
2667        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2668    }
2669
2670    #[test]
2671    fn i32_single_column() {
2672        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2673    }
2674
2675    #[test]
2676    fn i64_single_column() {
2677        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2678    }
2679
2680    #[test]
2681    fn u8_single_column() {
2682        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2683    }
2684
2685    #[test]
2686    fn u16_single_column() {
2687        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2688    }
2689
2690    #[test]
2691    fn u32_single_column() {
2692        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2693    }
2694
2695    #[test]
2696    fn u64_single_column() {
2697        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2698    }
2699
2700    #[test]
2701    fn f32_single_column() {
2702        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2703    }
2704
2705    #[test]
2706    fn f64_single_column() {
2707        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2708    }
2709
2710    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2711    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2712    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2713
2714    #[test]
2715    fn timestamp_second_single_column() {
2716        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2717        let values = Arc::new(TimestampSecondArray::from(raw_values));
2718
2719        one_column_roundtrip(values, false);
2720    }
2721
2722    #[test]
2723    fn timestamp_millisecond_single_column() {
2724        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2725        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2726
2727        one_column_roundtrip(values, false);
2728    }
2729
2730    #[test]
2731    fn timestamp_microsecond_single_column() {
2732        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2733        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2734
2735        one_column_roundtrip(values, false);
2736    }
2737
2738    #[test]
2739    fn timestamp_nanosecond_single_column() {
2740        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2741        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2742
2743        one_column_roundtrip(values, false);
2744    }
2745
2746    #[test]
2747    fn date32_single_column() {
2748        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2749    }
2750
2751    #[test]
2752    fn date64_single_column() {
2753        // Date64 must be a multiple of 86400000, see ARROW-10925
2754        required_and_optional::<Date64Array, _>(
2755            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2756        );
2757    }
2758
2759    #[test]
2760    fn time32_second_single_column() {
2761        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2762    }
2763
2764    #[test]
2765    fn time32_millisecond_single_column() {
2766        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2767    }
2768
2769    #[test]
2770    fn time64_microsecond_single_column() {
2771        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2772    }
2773
2774    #[test]
2775    fn time64_nanosecond_single_column() {
2776        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2777    }
2778
2779    #[test]
2780    fn duration_second_single_column() {
2781        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2782    }
2783
2784    #[test]
2785    fn duration_millisecond_single_column() {
2786        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2787    }
2788
2789    #[test]
2790    fn duration_microsecond_single_column() {
2791        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2792    }
2793
2794    #[test]
2795    fn duration_nanosecond_single_column() {
2796        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2797    }
2798
2799    #[test]
2800    fn interval_year_month_single_column() {
2801        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2802    }
2803
2804    #[test]
2805    fn interval_day_time_single_column() {
2806        required_and_optional::<IntervalDayTimeArray, _>(vec![
2807            IntervalDayTime::new(0, 1),
2808            IntervalDayTime::new(0, 3),
2809            IntervalDayTime::new(3, -2),
2810            IntervalDayTime::new(-200, 4),
2811        ]);
2812    }
2813
2814    #[test]
2815    #[should_panic(
2816        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2817    )]
2818    fn interval_month_day_nano_single_column() {
2819        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2820            IntervalMonthDayNano::new(0, 1, 5),
2821            IntervalMonthDayNano::new(0, 3, 2),
2822            IntervalMonthDayNano::new(3, -2, -5),
2823            IntervalMonthDayNano::new(-200, 4, -1),
2824        ]);
2825    }
2826
2827    #[test]
2828    fn binary_single_column() {
2829        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2830        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2831        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2832
2833        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2834        values_required::<BinaryArray, _>(many_vecs_iter);
2835    }
2836
2837    #[test]
2838    fn binary_view_single_column() {
2839        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2840        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2841        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2842
2843        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2844        values_required::<BinaryViewArray, _>(many_vecs_iter);
2845    }
2846
2847    #[test]
2848    fn i32_column_bloom_filter_at_end() {
2849        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2850        let mut options = RoundTripOptions::new(array, false);
2851        options.bloom_filter = true;
2852        options.bloom_filter_position = BloomFilterPosition::End;
2853
2854        let files = one_column_roundtrip_with_options(options);
2855        check_bloom_filter(
2856            files,
2857            "col".to_string(),
2858            (0..SMALL_SIZE as i32).collect(),
2859            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2860        );
2861    }
2862
2863    #[test]
2864    fn i32_column_bloom_filter() {
2865        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2866        let mut options = RoundTripOptions::new(array, false);
2867        options.bloom_filter = true;
2868
2869        let files = one_column_roundtrip_with_options(options);
2870        check_bloom_filter(
2871            files,
2872            "col".to_string(),
2873            (0..SMALL_SIZE as i32).collect(),
2874            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2875        );
2876    }
2877
2878    #[test]
2879    fn binary_column_bloom_filter() {
2880        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2881        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2882        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2883
2884        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2885        let mut options = RoundTripOptions::new(array, false);
2886        options.bloom_filter = true;
2887
2888        let files = one_column_roundtrip_with_options(options);
2889        check_bloom_filter(
2890            files,
2891            "col".to_string(),
2892            many_vecs,
2893            vec![vec![(SMALL_SIZE + 1) as u8]],
2894        );
2895    }
2896
2897    #[test]
2898    fn empty_string_null_column_bloom_filter() {
2899        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2900        let raw_strs = raw_values.iter().map(|s| s.as_str());
2901
2902        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2903        let mut options = RoundTripOptions::new(array, false);
2904        options.bloom_filter = true;
2905
2906        let files = one_column_roundtrip_with_options(options);
2907
2908        let optional_raw_values: Vec<_> = raw_values
2909            .iter()
2910            .enumerate()
2911            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2912            .collect();
2913        // For null slots, empty string should not be in bloom filter.
2914        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2915    }
2916
2917    #[test]
2918    fn large_binary_single_column() {
2919        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2920        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2921        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2922
2923        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2924        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2925    }
2926
2927    #[test]
2928    fn fixed_size_binary_single_column() {
2929        let mut builder = FixedSizeBinaryBuilder::new(4);
2930        builder.append_value(b"0123").unwrap();
2931        builder.append_null();
2932        builder.append_value(b"8910").unwrap();
2933        builder.append_value(b"1112").unwrap();
2934        let array = Arc::new(builder.finish());
2935
2936        one_column_roundtrip(array, true);
2937    }
2938
2939    #[test]
2940    fn string_single_column() {
2941        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2942        let raw_strs = raw_values.iter().map(|s| s.as_str());
2943
2944        required_and_optional::<StringArray, _>(raw_strs);
2945    }
2946
2947    #[test]
2948    fn large_string_single_column() {
2949        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2950        let raw_strs = raw_values.iter().map(|s| s.as_str());
2951
2952        required_and_optional::<LargeStringArray, _>(raw_strs);
2953    }
2954
2955    #[test]
2956    fn string_view_single_column() {
2957        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2958        let raw_strs = raw_values.iter().map(|s| s.as_str());
2959
2960        required_and_optional::<StringViewArray, _>(raw_strs);
2961    }
2962
2963    #[test]
2964    fn null_list_single_column() {
2965        let null_field = Field::new_list_field(DataType::Null, true);
2966        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2967
2968        let schema = Schema::new(vec![list_field]);
2969
2970        // Build [[], null, [null, null]]
2971        let a_values = NullArray::new(2);
2972        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2973        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2974            DataType::Null,
2975            true,
2976        ))))
2977        .len(3)
2978        .add_buffer(a_value_offsets)
2979        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2980        .add_child_data(a_values.into_data())
2981        .build()
2982        .unwrap();
2983
2984        let a = ListArray::from(a_list_data);
2985
2986        assert!(a.is_valid(0));
2987        assert!(!a.is_valid(1));
2988        assert!(a.is_valid(2));
2989
2990        assert_eq!(a.value(0).len(), 0);
2991        assert_eq!(a.value(2).len(), 2);
2992        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2993
2994        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2995        roundtrip(batch, None);
2996    }
2997
2998    #[test]
2999    fn list_single_column() {
3000        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3001        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3002        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3003            DataType::Int32,
3004            false,
3005        ))))
3006        .len(5)
3007        .add_buffer(a_value_offsets)
3008        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3009        .add_child_data(a_values.into_data())
3010        .build()
3011        .unwrap();
3012
3013        assert_eq!(a_list_data.null_count(), 1);
3014
3015        let a = ListArray::from(a_list_data);
3016        let values = Arc::new(a);
3017
3018        one_column_roundtrip(values, true);
3019    }
3020
3021    #[test]
3022    fn large_list_single_column() {
3023        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3024        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3025        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3026            "large_item",
3027            DataType::Int32,
3028            true,
3029        ))))
3030        .len(5)
3031        .add_buffer(a_value_offsets)
3032        .add_child_data(a_values.into_data())
3033        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3034        .build()
3035        .unwrap();
3036
3037        // I think this setup is incorrect because this should pass
3038        assert_eq!(a_list_data.null_count(), 1);
3039
3040        let a = LargeListArray::from(a_list_data);
3041        let values = Arc::new(a);
3042
3043        one_column_roundtrip(values, true);
3044    }
3045
3046    #[test]
3047    fn list_nested_nulls() {
3048        use arrow::datatypes::Int32Type;
3049        let data = vec![
3050            Some(vec![Some(1)]),
3051            Some(vec![Some(2), Some(3)]),
3052            None,
3053            Some(vec![Some(4), Some(5), None]),
3054            Some(vec![None]),
3055            Some(vec![Some(6), Some(7)]),
3056        ];
3057
3058        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3059        one_column_roundtrip(Arc::new(list), true);
3060
3061        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3062        one_column_roundtrip(Arc::new(list), true);
3063    }
3064
3065    #[test]
3066    fn struct_single_column() {
3067        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3068        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3069        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3070
3071        let values = Arc::new(s);
3072        one_column_roundtrip(values, false);
3073    }
3074
3075    #[test]
3076    fn list_and_map_coerced_names() {
3077        // Create map and list with non-Parquet naming
3078        let list_field =
3079            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3080        let map_field = Field::new_map(
3081            "my_map",
3082            "entries",
3083            Field::new("keys", DataType::Int32, false),
3084            Field::new("values", DataType::Int32, true),
3085            false,
3086            true,
3087        );
3088
3089        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3090        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3091
3092        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3093
3094        // Write data to Parquet but coerce names to match spec
3095        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3096        let file = tempfile::tempfile().unwrap();
3097        let mut writer =
3098            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3099
3100        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3101        writer.write(&batch).unwrap();
3102        let file_metadata = writer.close().unwrap();
3103
3104        let schema = file_metadata.file_metadata().schema();
3105        // Coerced name of "item" should be "element"
3106        let list_field = &schema.get_fields()[0].get_fields()[0];
3107        assert_eq!(list_field.get_fields()[0].name(), "element");
3108
3109        let map_field = &schema.get_fields()[1].get_fields()[0];
3110        // Coerced name of "entries" should be "key_value"
3111        assert_eq!(map_field.name(), "key_value");
3112        // Coerced name of "keys" should be "key"
3113        assert_eq!(map_field.get_fields()[0].name(), "key");
3114        // Coerced name of "values" should be "value"
3115        assert_eq!(map_field.get_fields()[1].name(), "value");
3116
3117        // Double check schema after reading from the file
3118        let reader = SerializedFileReader::new(file).unwrap();
3119        let file_schema = reader.metadata().file_metadata().schema();
3120        let fields = file_schema.get_fields();
3121        let list_field = &fields[0].get_fields()[0];
3122        assert_eq!(list_field.get_fields()[0].name(), "element");
3123        let map_field = &fields[1].get_fields()[0];
3124        assert_eq!(map_field.name(), "key_value");
3125        assert_eq!(map_field.get_fields()[0].name(), "key");
3126        assert_eq!(map_field.get_fields()[1].name(), "value");
3127    }
3128
3129    #[test]
3130    fn fallback_flush_data_page() {
3131        //tests if the Fallback::flush_data_page clears all buffers correctly
3132        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3133        let values = Arc::new(StringArray::from(raw_values));
3134        let encodings = vec![
3135            Encoding::DELTA_BYTE_ARRAY,
3136            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3137        ];
3138        let data_type = values.data_type().clone();
3139        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3140        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3141
3142        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3143        let data_page_size_limit: usize = 32;
3144        let write_batch_size: usize = 16;
3145
3146        for encoding in &encodings {
3147            for row_group_size in row_group_sizes {
3148                let props = WriterProperties::builder()
3149                    .set_writer_version(WriterVersion::PARQUET_2_0)
3150                    .set_max_row_group_size(row_group_size)
3151                    .set_dictionary_enabled(false)
3152                    .set_encoding(*encoding)
3153                    .set_data_page_size_limit(data_page_size_limit)
3154                    .set_write_batch_size(write_batch_size)
3155                    .build();
3156
3157                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3158                    let string_array_a = StringArray::from(a.clone());
3159                    let string_array_b = StringArray::from(b.clone());
3160                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3161                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3162                    assert_eq!(
3163                        vec_a, vec_b,
3164                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3165                    );
3166                });
3167            }
3168        }
3169    }
3170
3171    #[test]
3172    fn arrow_writer_string_dictionary() {
3173        // define schema
3174        #[allow(deprecated)]
3175        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3176            "dictionary",
3177            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3178            true,
3179            42,
3180            true,
3181        )]));
3182
3183        // create some data
3184        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3185            .iter()
3186            .copied()
3187            .collect();
3188
3189        // build a record batch
3190        one_column_roundtrip_with_schema(Arc::new(d), schema);
3191    }
3192
3193    #[test]
3194    fn arrow_writer_test_type_compatibility() {
3195        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3196        where
3197            T1: Array + 'static,
3198            T2: Array + 'static,
3199        {
3200            let schema1 = Arc::new(Schema::new(vec![Field::new(
3201                "a",
3202                array1.data_type().clone(),
3203                false,
3204            )]));
3205
3206            let file = tempfile().unwrap();
3207            let mut writer =
3208                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3209
3210            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3211            writer.write(&rb1).unwrap();
3212
3213            let schema2 = Arc::new(Schema::new(vec![Field::new(
3214                "a",
3215                array2.data_type().clone(),
3216                false,
3217            )]));
3218            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3219            writer.write(&rb2).unwrap();
3220
3221            writer.close().unwrap();
3222
3223            let mut record_batch_reader =
3224                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3225            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3226
3227            let expected_batch =
3228                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3229            assert_eq!(actual_batch, expected_batch);
3230        }
3231
3232        // check compatibility between native and dictionaries
3233
3234        ensure_compatible_write(
3235            DictionaryArray::new(
3236                UInt8Array::from_iter_values(vec![0]),
3237                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3238            ),
3239            StringArray::from_iter_values(vec!["barquet"]),
3240            DictionaryArray::new(
3241                UInt8Array::from_iter_values(vec![0, 1]),
3242                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3243            ),
3244        );
3245
3246        ensure_compatible_write(
3247            StringArray::from_iter_values(vec!["parquet"]),
3248            DictionaryArray::new(
3249                UInt8Array::from_iter_values(vec![0]),
3250                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3251            ),
3252            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3253        );
3254
3255        // check compatibility between dictionaries with different key types
3256
3257        ensure_compatible_write(
3258            DictionaryArray::new(
3259                UInt8Array::from_iter_values(vec![0]),
3260                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3261            ),
3262            DictionaryArray::new(
3263                UInt16Array::from_iter_values(vec![0]),
3264                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3265            ),
3266            DictionaryArray::new(
3267                UInt8Array::from_iter_values(vec![0, 1]),
3268                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3269            ),
3270        );
3271
3272        // check compatibility between dictionaries with different value types
3273        ensure_compatible_write(
3274            DictionaryArray::new(
3275                UInt8Array::from_iter_values(vec![0]),
3276                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3277            ),
3278            DictionaryArray::new(
3279                UInt8Array::from_iter_values(vec![0]),
3280                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3281            ),
3282            DictionaryArray::new(
3283                UInt8Array::from_iter_values(vec![0, 1]),
3284                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3285            ),
3286        );
3287
3288        // check compatibility between a dictionary and a native array with a different type
3289        ensure_compatible_write(
3290            DictionaryArray::new(
3291                UInt8Array::from_iter_values(vec![0]),
3292                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3293            ),
3294            LargeStringArray::from_iter_values(vec!["barquet"]),
3295            DictionaryArray::new(
3296                UInt8Array::from_iter_values(vec![0, 1]),
3297                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3298            ),
3299        );
3300
3301        // check compatibility for string types
3302
3303        ensure_compatible_write(
3304            StringArray::from_iter_values(vec!["parquet"]),
3305            LargeStringArray::from_iter_values(vec!["barquet"]),
3306            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3307        );
3308
3309        ensure_compatible_write(
3310            LargeStringArray::from_iter_values(vec!["parquet"]),
3311            StringArray::from_iter_values(vec!["barquet"]),
3312            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3313        );
3314
3315        ensure_compatible_write(
3316            StringArray::from_iter_values(vec!["parquet"]),
3317            StringViewArray::from_iter_values(vec!["barquet"]),
3318            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3319        );
3320
3321        ensure_compatible_write(
3322            StringViewArray::from_iter_values(vec!["parquet"]),
3323            StringArray::from_iter_values(vec!["barquet"]),
3324            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3325        );
3326
3327        ensure_compatible_write(
3328            LargeStringArray::from_iter_values(vec!["parquet"]),
3329            StringViewArray::from_iter_values(vec!["barquet"]),
3330            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3331        );
3332
3333        ensure_compatible_write(
3334            StringViewArray::from_iter_values(vec!["parquet"]),
3335            LargeStringArray::from_iter_values(vec!["barquet"]),
3336            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3337        );
3338
3339        // check compatibility for binary types
3340
3341        ensure_compatible_write(
3342            BinaryArray::from_iter_values(vec![b"parquet"]),
3343            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3344            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3345        );
3346
3347        ensure_compatible_write(
3348            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3349            BinaryArray::from_iter_values(vec![b"barquet"]),
3350            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3351        );
3352
3353        ensure_compatible_write(
3354            BinaryArray::from_iter_values(vec![b"parquet"]),
3355            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3356            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3357        );
3358
3359        ensure_compatible_write(
3360            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3361            BinaryArray::from_iter_values(vec![b"barquet"]),
3362            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3363        );
3364
3365        ensure_compatible_write(
3366            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3367            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3368            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3369        );
3370
3371        ensure_compatible_write(
3372            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3373            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3374            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3375        );
3376
3377        // check compatibility for list types
3378
3379        let list_field_metadata = HashMap::from_iter(vec![(
3380            PARQUET_FIELD_ID_META_KEY.to_string(),
3381            "1".to_string(),
3382        )]);
3383        let list_field = Field::new_list_field(DataType::Int32, false);
3384
3385        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3386        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3387
3388        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3389        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3390
3391        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3392        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3393
3394        ensure_compatible_write(
3395            // when the initial schema has the metadata ...
3396            ListArray::try_new(
3397                Arc::new(
3398                    list_field
3399                        .clone()
3400                        .with_metadata(list_field_metadata.clone()),
3401                ),
3402                offsets1,
3403                values1,
3404                None,
3405            )
3406            .unwrap(),
3407            // ... and some intermediate schema doesn't have the metadata
3408            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3409            // ... the write will still go through, and the resulting schema will inherit the initial metadata
3410            ListArray::try_new(
3411                Arc::new(
3412                    list_field
3413                        .clone()
3414                        .with_metadata(list_field_metadata.clone()),
3415                ),
3416                offsets_expected,
3417                values_expected,
3418                None,
3419            )
3420            .unwrap(),
3421        );
3422    }
3423
3424    #[test]
3425    fn arrow_writer_primitive_dictionary() {
3426        // define schema
3427        #[allow(deprecated)]
3428        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3429            "dictionary",
3430            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3431            true,
3432            42,
3433            true,
3434        )]));
3435
3436        // create some data
3437        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3438        builder.append(12345678).unwrap();
3439        builder.append_null();
3440        builder.append(22345678).unwrap();
3441        builder.append(12345678).unwrap();
3442        let d = builder.finish();
3443
3444        one_column_roundtrip_with_schema(Arc::new(d), schema);
3445    }
3446
3447    #[test]
3448    fn arrow_writer_decimal32_dictionary() {
3449        let integers = vec![12345, 56789, 34567];
3450
3451        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3452
3453        let values = Decimal32Array::from(integers.clone())
3454            .with_precision_and_scale(5, 2)
3455            .unwrap();
3456
3457        let array = DictionaryArray::new(keys, Arc::new(values));
3458        one_column_roundtrip(Arc::new(array.clone()), true);
3459
3460        let values = Decimal32Array::from(integers)
3461            .with_precision_and_scale(9, 2)
3462            .unwrap();
3463
3464        let array = array.with_values(Arc::new(values));
3465        one_column_roundtrip(Arc::new(array), true);
3466    }
3467
3468    #[test]
3469    fn arrow_writer_decimal64_dictionary() {
3470        let integers = vec![12345, 56789, 34567];
3471
3472        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3473
3474        let values = Decimal64Array::from(integers.clone())
3475            .with_precision_and_scale(5, 2)
3476            .unwrap();
3477
3478        let array = DictionaryArray::new(keys, Arc::new(values));
3479        one_column_roundtrip(Arc::new(array.clone()), true);
3480
3481        let values = Decimal64Array::from(integers)
3482            .with_precision_and_scale(12, 2)
3483            .unwrap();
3484
3485        let array = array.with_values(Arc::new(values));
3486        one_column_roundtrip(Arc::new(array), true);
3487    }
3488
3489    #[test]
3490    fn arrow_writer_decimal128_dictionary() {
3491        let integers = vec![12345, 56789, 34567];
3492
3493        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3494
3495        let values = Decimal128Array::from(integers.clone())
3496            .with_precision_and_scale(5, 2)
3497            .unwrap();
3498
3499        let array = DictionaryArray::new(keys, Arc::new(values));
3500        one_column_roundtrip(Arc::new(array.clone()), true);
3501
3502        let values = Decimal128Array::from(integers)
3503            .with_precision_and_scale(12, 2)
3504            .unwrap();
3505
3506        let array = array.with_values(Arc::new(values));
3507        one_column_roundtrip(Arc::new(array), true);
3508    }
3509
3510    #[test]
3511    fn arrow_writer_decimal256_dictionary() {
3512        let integers = vec![
3513            i256::from_i128(12345),
3514            i256::from_i128(56789),
3515            i256::from_i128(34567),
3516        ];
3517
3518        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3519
3520        let values = Decimal256Array::from(integers.clone())
3521            .with_precision_and_scale(5, 2)
3522            .unwrap();
3523
3524        let array = DictionaryArray::new(keys, Arc::new(values));
3525        one_column_roundtrip(Arc::new(array.clone()), true);
3526
3527        let values = Decimal256Array::from(integers)
3528            .with_precision_and_scale(12, 2)
3529            .unwrap();
3530
3531        let array = array.with_values(Arc::new(values));
3532        one_column_roundtrip(Arc::new(array), true);
3533    }
3534
3535    #[test]
3536    fn arrow_writer_string_dictionary_unsigned_index() {
3537        // define schema
3538        #[allow(deprecated)]
3539        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3540            "dictionary",
3541            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3542            true,
3543            42,
3544            true,
3545        )]));
3546
3547        // create some data
3548        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3549            .iter()
3550            .copied()
3551            .collect();
3552
3553        one_column_roundtrip_with_schema(Arc::new(d), schema);
3554    }
3555
3556    #[test]
3557    fn u32_min_max() {
3558        // check values roundtrip through parquet
3559        let src = [
3560            u32::MIN,
3561            u32::MIN + 1,
3562            (i32::MAX as u32) - 1,
3563            i32::MAX as u32,
3564            (i32::MAX as u32) + 1,
3565            u32::MAX - 1,
3566            u32::MAX,
3567        ];
3568        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3569        let files = one_column_roundtrip(values, false);
3570
3571        for file in files {
3572            // check statistics are valid
3573            let reader = SerializedFileReader::new(file).unwrap();
3574            let metadata = reader.metadata();
3575
3576            let mut row_offset = 0;
3577            for row_group in metadata.row_groups() {
3578                assert_eq!(row_group.num_columns(), 1);
3579                let column = row_group.column(0);
3580
3581                let num_values = column.num_values() as usize;
3582                let src_slice = &src[row_offset..row_offset + num_values];
3583                row_offset += column.num_values() as usize;
3584
3585                let stats = column.statistics().unwrap();
3586                if let Statistics::Int32(stats) = stats {
3587                    assert_eq!(
3588                        *stats.min_opt().unwrap() as u32,
3589                        *src_slice.iter().min().unwrap()
3590                    );
3591                    assert_eq!(
3592                        *stats.max_opt().unwrap() as u32,
3593                        *src_slice.iter().max().unwrap()
3594                    );
3595                } else {
3596                    panic!("Statistics::Int32 missing")
3597                }
3598            }
3599        }
3600    }
3601
3602    #[test]
3603    fn u64_min_max() {
3604        // check values roundtrip through parquet
3605        let src = [
3606            u64::MIN,
3607            u64::MIN + 1,
3608            (i64::MAX as u64) - 1,
3609            i64::MAX as u64,
3610            (i64::MAX as u64) + 1,
3611            u64::MAX - 1,
3612            u64::MAX,
3613        ];
3614        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3615        let files = one_column_roundtrip(values, false);
3616
3617        for file in files {
3618            // check statistics are valid
3619            let reader = SerializedFileReader::new(file).unwrap();
3620            let metadata = reader.metadata();
3621
3622            let mut row_offset = 0;
3623            for row_group in metadata.row_groups() {
3624                assert_eq!(row_group.num_columns(), 1);
3625                let column = row_group.column(0);
3626
3627                let num_values = column.num_values() as usize;
3628                let src_slice = &src[row_offset..row_offset + num_values];
3629                row_offset += column.num_values() as usize;
3630
3631                let stats = column.statistics().unwrap();
3632                if let Statistics::Int64(stats) = stats {
3633                    assert_eq!(
3634                        *stats.min_opt().unwrap() as u64,
3635                        *src_slice.iter().min().unwrap()
3636                    );
3637                    assert_eq!(
3638                        *stats.max_opt().unwrap() as u64,
3639                        *src_slice.iter().max().unwrap()
3640                    );
3641                } else {
3642                    panic!("Statistics::Int64 missing")
3643                }
3644            }
3645        }
3646    }
3647
3648    #[test]
3649    fn statistics_null_counts_only_nulls() {
3650        // check that null-count statistics for "only NULL"-columns are correct
3651        let values = Arc::new(UInt64Array::from(vec![None, None]));
3652        let files = one_column_roundtrip(values, true);
3653
3654        for file in files {
3655            // check statistics are valid
3656            let reader = SerializedFileReader::new(file).unwrap();
3657            let metadata = reader.metadata();
3658            assert_eq!(metadata.num_row_groups(), 1);
3659            let row_group = metadata.row_group(0);
3660            assert_eq!(row_group.num_columns(), 1);
3661            let column = row_group.column(0);
3662            let stats = column.statistics().unwrap();
3663            assert_eq!(stats.null_count_opt(), Some(2));
3664        }
3665    }
3666
3667    #[test]
3668    fn test_list_of_struct_roundtrip() {
3669        // define schema
3670        let int_field = Field::new("a", DataType::Int32, true);
3671        let int_field2 = Field::new("b", DataType::Int32, true);
3672
3673        let int_builder = Int32Builder::with_capacity(10);
3674        let int_builder2 = Int32Builder::with_capacity(10);
3675
3676        let struct_builder = StructBuilder::new(
3677            vec![int_field, int_field2],
3678            vec![Box::new(int_builder), Box::new(int_builder2)],
3679        );
3680        let mut list_builder = ListBuilder::new(struct_builder);
3681
3682        // Construct the following array
3683        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3684
3685        // [{a: 1, b: 2}]
3686        let values = list_builder.values();
3687        values
3688            .field_builder::<Int32Builder>(0)
3689            .unwrap()
3690            .append_value(1);
3691        values
3692            .field_builder::<Int32Builder>(1)
3693            .unwrap()
3694            .append_value(2);
3695        values.append(true);
3696        list_builder.append(true);
3697
3698        // []
3699        list_builder.append(true);
3700
3701        // null
3702        list_builder.append(false);
3703
3704        // [null, null]
3705        let values = list_builder.values();
3706        values
3707            .field_builder::<Int32Builder>(0)
3708            .unwrap()
3709            .append_null();
3710        values
3711            .field_builder::<Int32Builder>(1)
3712            .unwrap()
3713            .append_null();
3714        values.append(false);
3715        values
3716            .field_builder::<Int32Builder>(0)
3717            .unwrap()
3718            .append_null();
3719        values
3720            .field_builder::<Int32Builder>(1)
3721            .unwrap()
3722            .append_null();
3723        values.append(false);
3724        list_builder.append(true);
3725
3726        // [{a: null, b: 3}]
3727        let values = list_builder.values();
3728        values
3729            .field_builder::<Int32Builder>(0)
3730            .unwrap()
3731            .append_null();
3732        values
3733            .field_builder::<Int32Builder>(1)
3734            .unwrap()
3735            .append_value(3);
3736        values.append(true);
3737        list_builder.append(true);
3738
3739        // [{a: 2, b: null}]
3740        let values = list_builder.values();
3741        values
3742            .field_builder::<Int32Builder>(0)
3743            .unwrap()
3744            .append_value(2);
3745        values
3746            .field_builder::<Int32Builder>(1)
3747            .unwrap()
3748            .append_null();
3749        values.append(true);
3750        list_builder.append(true);
3751
3752        let array = Arc::new(list_builder.finish());
3753
3754        one_column_roundtrip(array, true);
3755    }
3756
3757    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3758        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3759    }
3760
3761    #[test]
3762    fn test_aggregates_records() {
3763        let arrays = [
3764            Int32Array::from((0..100).collect::<Vec<_>>()),
3765            Int32Array::from((0..50).collect::<Vec<_>>()),
3766            Int32Array::from((200..500).collect::<Vec<_>>()),
3767        ];
3768
3769        let schema = Arc::new(Schema::new(vec![Field::new(
3770            "int",
3771            ArrowDataType::Int32,
3772            false,
3773        )]));
3774
3775        let file = tempfile::tempfile().unwrap();
3776
3777        let props = WriterProperties::builder()
3778            .set_max_row_group_size(200)
3779            .build();
3780
3781        let mut writer =
3782            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3783
3784        for array in arrays {
3785            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3786            writer.write(&batch).unwrap();
3787        }
3788
3789        writer.close().unwrap();
3790
3791        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3792        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3793
3794        let batches = builder
3795            .with_batch_size(100)
3796            .build()
3797            .unwrap()
3798            .collect::<ArrowResult<Vec<_>>>()
3799            .unwrap();
3800
3801        assert_eq!(batches.len(), 5);
3802        assert!(batches.iter().all(|x| x.num_columns() == 1));
3803
3804        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3805
3806        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3807
3808        let values: Vec<_> = batches
3809            .iter()
3810            .flat_map(|x| {
3811                x.column(0)
3812                    .as_any()
3813                    .downcast_ref::<Int32Array>()
3814                    .unwrap()
3815                    .values()
3816                    .iter()
3817                    .cloned()
3818            })
3819            .collect();
3820
3821        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3822        assert_eq!(&values, &expected_values)
3823    }
3824
3825    #[test]
3826    fn complex_aggregate() {
3827        // Tests aggregating nested data
3828        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3829        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3830        let struct_a = Arc::new(Field::new(
3831            "struct_a",
3832            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3833            true,
3834        ));
3835
3836        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3837        let struct_b = Arc::new(Field::new(
3838            "struct_b",
3839            DataType::Struct(vec![list_a.clone()].into()),
3840            false,
3841        ));
3842
3843        let schema = Arc::new(Schema::new(vec![struct_b]));
3844
3845        // create nested data
3846        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3847        let field_b_array =
3848            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3849
3850        let struct_a_array = StructArray::from(vec![
3851            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3852            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3853        ]);
3854
3855        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3856            .len(5)
3857            .add_buffer(Buffer::from_iter(vec![
3858                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3859            ]))
3860            .null_bit_buffer(Some(Buffer::from_iter(vec![
3861                true, false, true, false, true,
3862            ])))
3863            .child_data(vec![struct_a_array.into_data()])
3864            .build()
3865            .unwrap();
3866
3867        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3868        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3869
3870        let batch1 =
3871            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3872                .unwrap();
3873
3874        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3875        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3876
3877        let struct_a_array = StructArray::from(vec![
3878            (field_a, Arc::new(field_a_array) as ArrayRef),
3879            (field_b, Arc::new(field_b_array) as ArrayRef),
3880        ]);
3881
3882        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3883            .len(2)
3884            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3885            .child_data(vec![struct_a_array.into_data()])
3886            .build()
3887            .unwrap();
3888
3889        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3890        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3891
3892        let batch2 =
3893            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3894                .unwrap();
3895
3896        let batches = &[batch1, batch2];
3897
3898        // Verify data is as expected
3899
3900        let expected = r#"
3901            +-------------------------------------------------------------------------------------------------------+
3902            | struct_b                                                                                              |
3903            +-------------------------------------------------------------------------------------------------------+
3904            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3905            | {list: }                                                                                              |
3906            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3907            | {list: }                                                                                              |
3908            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3909            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3910            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3911            +-------------------------------------------------------------------------------------------------------+
3912        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3913
3914        let actual = pretty_format_batches(batches).unwrap().to_string();
3915        assert_eq!(actual, expected);
3916
3917        // Write data
3918        let file = tempfile::tempfile().unwrap();
3919        let props = WriterProperties::builder()
3920            .set_max_row_group_size(6)
3921            .build();
3922
3923        let mut writer =
3924            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3925
3926        for batch in batches {
3927            writer.write(batch).unwrap();
3928        }
3929        writer.close().unwrap();
3930
3931        // Read Data
3932        // Should have written entire first batch and first row of second to the first row group
3933        // leaving a single row in the second row group
3934
3935        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3936        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3937
3938        let batches = builder
3939            .with_batch_size(2)
3940            .build()
3941            .unwrap()
3942            .collect::<ArrowResult<Vec<_>>>()
3943            .unwrap();
3944
3945        assert_eq!(batches.len(), 4);
3946        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3947        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3948
3949        let actual = pretty_format_batches(&batches).unwrap().to_string();
3950        assert_eq!(actual, expected);
3951    }
3952
3953    #[test]
3954    fn test_arrow_writer_metadata() {
3955        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3956        let file_schema = batch_schema.clone().with_metadata(
3957            vec![("foo".to_string(), "bar".to_string())]
3958                .into_iter()
3959                .collect(),
3960        );
3961
3962        let batch = RecordBatch::try_new(
3963            Arc::new(batch_schema),
3964            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3965        )
3966        .unwrap();
3967
3968        let mut buf = Vec::with_capacity(1024);
3969        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3970        writer.write(&batch).unwrap();
3971        writer.close().unwrap();
3972    }
3973
3974    #[test]
3975    fn test_arrow_writer_nullable() {
3976        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3977        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3978        let file_schema = Arc::new(file_schema);
3979
3980        let batch = RecordBatch::try_new(
3981            Arc::new(batch_schema),
3982            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3983        )
3984        .unwrap();
3985
3986        let mut buf = Vec::with_capacity(1024);
3987        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3988        writer.write(&batch).unwrap();
3989        writer.close().unwrap();
3990
3991        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3992        let back = read.next().unwrap().unwrap();
3993        assert_eq!(back.schema(), file_schema);
3994        assert_ne!(back.schema(), batch.schema());
3995        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3996    }
3997
3998    #[test]
3999    fn in_progress_accounting() {
4000        // define schema
4001        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4002
4003        // create some data
4004        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4005
4006        // build a record batch
4007        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4008
4009        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4010
4011        // starts empty
4012        assert_eq!(writer.in_progress_size(), 0);
4013        assert_eq!(writer.in_progress_rows(), 0);
4014        assert_eq!(writer.memory_size(), 0);
4015        assert_eq!(writer.bytes_written(), 4); // Initial header
4016        writer.write(&batch).unwrap();
4017
4018        // updated on write
4019        let initial_size = writer.in_progress_size();
4020        assert!(initial_size > 0);
4021        assert_eq!(writer.in_progress_rows(), 5);
4022        let initial_memory = writer.memory_size();
4023        assert!(initial_memory > 0);
4024        // memory estimate is larger than estimated encoded size
4025        assert!(
4026            initial_size <= initial_memory,
4027            "{initial_size} <= {initial_memory}"
4028        );
4029
4030        // updated on second write
4031        writer.write(&batch).unwrap();
4032        assert!(writer.in_progress_size() > initial_size);
4033        assert_eq!(writer.in_progress_rows(), 10);
4034        assert!(writer.memory_size() > initial_memory);
4035        assert!(
4036            writer.in_progress_size() <= writer.memory_size(),
4037            "in_progress_size {} <= memory_size {}",
4038            writer.in_progress_size(),
4039            writer.memory_size()
4040        );
4041
4042        // in progress tracking is cleared, but the overall data written is updated
4043        let pre_flush_bytes_written = writer.bytes_written();
4044        writer.flush().unwrap();
4045        assert_eq!(writer.in_progress_size(), 0);
4046        assert_eq!(writer.memory_size(), 0);
4047        assert!(writer.bytes_written() > pre_flush_bytes_written);
4048
4049        writer.close().unwrap();
4050    }
4051
4052    #[test]
4053    fn test_writer_all_null() {
4054        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4055        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4056        let batch = RecordBatch::try_from_iter(vec![
4057            ("a", Arc::new(a) as ArrayRef),
4058            ("b", Arc::new(b) as ArrayRef),
4059        ])
4060        .unwrap();
4061
4062        let mut buf = Vec::with_capacity(1024);
4063        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4064        writer.write(&batch).unwrap();
4065        writer.close().unwrap();
4066
4067        let bytes = Bytes::from(buf);
4068        let options = ReadOptionsBuilder::new().with_page_index().build();
4069        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4070        let index = reader.metadata().offset_index().unwrap();
4071
4072        assert_eq!(index.len(), 1);
4073        assert_eq!(index[0].len(), 2); // 2 columns
4074        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4075        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4076    }
4077
4078    #[test]
4079    fn test_disabled_statistics_with_page() {
4080        let file_schema = Schema::new(vec![
4081            Field::new("a", DataType::Utf8, true),
4082            Field::new("b", DataType::Utf8, true),
4083        ]);
4084        let file_schema = Arc::new(file_schema);
4085
4086        let batch = RecordBatch::try_new(
4087            file_schema.clone(),
4088            vec![
4089                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4090                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4091            ],
4092        )
4093        .unwrap();
4094
4095        let props = WriterProperties::builder()
4096            .set_statistics_enabled(EnabledStatistics::None)
4097            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4098            .build();
4099
4100        let mut buf = Vec::with_capacity(1024);
4101        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4102        writer.write(&batch).unwrap();
4103
4104        let metadata = writer.close().unwrap();
4105        assert_eq!(metadata.num_row_groups(), 1);
4106        let row_group = metadata.row_group(0);
4107        assert_eq!(row_group.num_columns(), 2);
4108        // Column "a" has both offset and column index, as requested
4109        assert!(row_group.column(0).offset_index_offset().is_some());
4110        assert!(row_group.column(0).column_index_offset().is_some());
4111        // Column "b" should only have offset index
4112        assert!(row_group.column(1).offset_index_offset().is_some());
4113        assert!(row_group.column(1).column_index_offset().is_none());
4114
4115        let options = ReadOptionsBuilder::new().with_page_index().build();
4116        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4117
4118        let row_group = reader.get_row_group(0).unwrap();
4119        let a_col = row_group.metadata().column(0);
4120        let b_col = row_group.metadata().column(1);
4121
4122        // Column chunk of column "a" should have chunk level statistics
4123        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4124            let min = byte_array_stats.min_opt().unwrap();
4125            let max = byte_array_stats.max_opt().unwrap();
4126
4127            assert_eq!(min.as_bytes(), b"a");
4128            assert_eq!(max.as_bytes(), b"d");
4129        } else {
4130            panic!("expecting Statistics::ByteArray");
4131        }
4132
4133        // The column chunk for column "b" shouldn't have statistics
4134        assert!(b_col.statistics().is_none());
4135
4136        let offset_index = reader.metadata().offset_index().unwrap();
4137        assert_eq!(offset_index.len(), 1); // 1 row group
4138        assert_eq!(offset_index[0].len(), 2); // 2 columns
4139
4140        let column_index = reader.metadata().column_index().unwrap();
4141        assert_eq!(column_index.len(), 1); // 1 row group
4142        assert_eq!(column_index[0].len(), 2); // 2 columns
4143
4144        let a_idx = &column_index[0][0];
4145        assert!(
4146            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4147            "{a_idx:?}"
4148        );
4149        let b_idx = &column_index[0][1];
4150        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4151    }
4152
4153    #[test]
4154    fn test_disabled_statistics_with_chunk() {
4155        let file_schema = Schema::new(vec![
4156            Field::new("a", DataType::Utf8, true),
4157            Field::new("b", DataType::Utf8, true),
4158        ]);
4159        let file_schema = Arc::new(file_schema);
4160
4161        let batch = RecordBatch::try_new(
4162            file_schema.clone(),
4163            vec![
4164                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4165                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4166            ],
4167        )
4168        .unwrap();
4169
4170        let props = WriterProperties::builder()
4171            .set_statistics_enabled(EnabledStatistics::None)
4172            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4173            .build();
4174
4175        let mut buf = Vec::with_capacity(1024);
4176        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4177        writer.write(&batch).unwrap();
4178
4179        let metadata = writer.close().unwrap();
4180        assert_eq!(metadata.num_row_groups(), 1);
4181        let row_group = metadata.row_group(0);
4182        assert_eq!(row_group.num_columns(), 2);
4183        // Column "a" should only have offset index
4184        assert!(row_group.column(0).offset_index_offset().is_some());
4185        assert!(row_group.column(0).column_index_offset().is_none());
4186        // Column "b" should only have offset index
4187        assert!(row_group.column(1).offset_index_offset().is_some());
4188        assert!(row_group.column(1).column_index_offset().is_none());
4189
4190        let options = ReadOptionsBuilder::new().with_page_index().build();
4191        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4192
4193        let row_group = reader.get_row_group(0).unwrap();
4194        let a_col = row_group.metadata().column(0);
4195        let b_col = row_group.metadata().column(1);
4196
4197        // Column chunk of column "a" should have chunk level statistics
4198        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4199            let min = byte_array_stats.min_opt().unwrap();
4200            let max = byte_array_stats.max_opt().unwrap();
4201
4202            assert_eq!(min.as_bytes(), b"a");
4203            assert_eq!(max.as_bytes(), b"d");
4204        } else {
4205            panic!("expecting Statistics::ByteArray");
4206        }
4207
4208        // The column chunk for column "b"  shouldn't have statistics
4209        assert!(b_col.statistics().is_none());
4210
4211        let column_index = reader.metadata().column_index().unwrap();
4212        assert_eq!(column_index.len(), 1); // 1 row group
4213        assert_eq!(column_index[0].len(), 2); // 2 columns
4214
4215        let a_idx = &column_index[0][0];
4216        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4217        let b_idx = &column_index[0][1];
4218        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4219    }
4220
4221    #[test]
4222    fn test_arrow_writer_skip_metadata() {
4223        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4224        let file_schema = Arc::new(batch_schema.clone());
4225
4226        let batch = RecordBatch::try_new(
4227            Arc::new(batch_schema),
4228            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4229        )
4230        .unwrap();
4231        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4232
4233        let mut buf = Vec::with_capacity(1024);
4234        let mut writer =
4235            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4236        writer.write(&batch).unwrap();
4237        writer.close().unwrap();
4238
4239        let bytes = Bytes::from(buf);
4240        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4241        assert_eq!(file_schema, *reader_builder.schema());
4242        if let Some(key_value_metadata) = reader_builder
4243            .metadata()
4244            .file_metadata()
4245            .key_value_metadata()
4246        {
4247            assert!(
4248                !key_value_metadata
4249                    .iter()
4250                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4251            );
4252        }
4253    }
4254
4255    #[test]
4256    fn mismatched_schemas() {
4257        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4258        let file_schema = Arc::new(Schema::new(vec![Field::new(
4259            "temperature",
4260            DataType::Float64,
4261            false,
4262        )]));
4263
4264        let batch = RecordBatch::try_new(
4265            Arc::new(batch_schema),
4266            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4267        )
4268        .unwrap();
4269
4270        let mut buf = Vec::with_capacity(1024);
4271        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4272
4273        let err = writer.write(&batch).unwrap_err().to_string();
4274        assert_eq!(
4275            err,
4276            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4277        );
4278    }
4279
4280    #[test]
4281    // https://github.com/apache/arrow-rs/issues/6988
4282    fn test_roundtrip_empty_schema() {
4283        // create empty record batch with empty schema
4284        let empty_batch = RecordBatch::try_new_with_options(
4285            Arc::new(Schema::empty()),
4286            vec![],
4287            &RecordBatchOptions::default().with_row_count(Some(0)),
4288        )
4289        .unwrap();
4290
4291        // write to parquet
4292        let mut parquet_bytes: Vec<u8> = Vec::new();
4293        let mut writer =
4294            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4295        writer.write(&empty_batch).unwrap();
4296        writer.close().unwrap();
4297
4298        // read from parquet
4299        let bytes = Bytes::from(parquet_bytes);
4300        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4301        assert_eq!(reader.schema(), &empty_batch.schema());
4302        let batches: Vec<_> = reader
4303            .build()
4304            .unwrap()
4305            .collect::<ArrowResult<Vec<_>>>()
4306            .unwrap();
4307        assert_eq!(batches.len(), 0);
4308    }
4309
4310    #[test]
4311    fn test_page_stats_not_written_by_default() {
4312        let string_field = Field::new("a", DataType::Utf8, false);
4313        let schema = Schema::new(vec![string_field]);
4314        let raw_string_values = vec!["Blart Versenwald III"];
4315        let string_values = StringArray::from(raw_string_values.clone());
4316        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4317
4318        let props = WriterProperties::builder()
4319            .set_statistics_enabled(EnabledStatistics::Page)
4320            .set_dictionary_enabled(false)
4321            .set_encoding(Encoding::PLAIN)
4322            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4323            .build();
4324
4325        let file = roundtrip_opts(&batch, props);
4326
4327        // read file and decode page headers
4328        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4329
4330        // decode first page header
4331        let first_page = &file[4..];
4332        let mut prot = ThriftSliceInputProtocol::new(first_page);
4333        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4334        let stats = hdr.data_page_header.unwrap().statistics;
4335
4336        assert!(stats.is_none());
4337    }
4338
4339    #[test]
4340    fn test_page_stats_when_enabled() {
4341        let string_field = Field::new("a", DataType::Utf8, false);
4342        let schema = Schema::new(vec![string_field]);
4343        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4344        let string_values = StringArray::from(raw_string_values.clone());
4345        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4346
4347        let props = WriterProperties::builder()
4348            .set_statistics_enabled(EnabledStatistics::Page)
4349            .set_dictionary_enabled(false)
4350            .set_encoding(Encoding::PLAIN)
4351            .set_write_page_header_statistics(true)
4352            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4353            .build();
4354
4355        let file = roundtrip_opts(&batch, props);
4356
4357        // read file and decode page headers
4358        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4359
4360        // decode first page header
4361        let first_page = &file[4..];
4362        let mut prot = ThriftSliceInputProtocol::new(first_page);
4363        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4364        let stats = hdr.data_page_header.unwrap().statistics;
4365
4366        let stats = stats.unwrap();
4367        // check that min/max were actually written to the page
4368        assert!(stats.is_max_value_exact.unwrap());
4369        assert!(stats.is_min_value_exact.unwrap());
4370        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4371        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4372    }
4373
4374    #[test]
4375    fn test_page_stats_truncation() {
4376        let string_field = Field::new("a", DataType::Utf8, false);
4377        let binary_field = Field::new("b", DataType::Binary, false);
4378        let schema = Schema::new(vec![string_field, binary_field]);
4379
4380        let raw_string_values = vec!["Blart Versenwald III"];
4381        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4382        let raw_binary_value_refs = raw_binary_values
4383            .iter()
4384            .map(|x| x.as_slice())
4385            .collect::<Vec<_>>();
4386
4387        let string_values = StringArray::from(raw_string_values.clone());
4388        let binary_values = BinaryArray::from(raw_binary_value_refs);
4389        let batch = RecordBatch::try_new(
4390            Arc::new(schema),
4391            vec![Arc::new(string_values), Arc::new(binary_values)],
4392        )
4393        .unwrap();
4394
4395        let props = WriterProperties::builder()
4396            .set_statistics_truncate_length(Some(2))
4397            .set_dictionary_enabled(false)
4398            .set_encoding(Encoding::PLAIN)
4399            .set_write_page_header_statistics(true)
4400            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4401            .build();
4402
4403        let file = roundtrip_opts(&batch, props);
4404
4405        // read file and decode page headers
4406        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4407
4408        // decode first page header
4409        let first_page = &file[4..];
4410        let mut prot = ThriftSliceInputProtocol::new(first_page);
4411        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4412        let stats = hdr.data_page_header.unwrap().statistics;
4413        assert!(stats.is_some());
4414        let stats = stats.unwrap();
4415        // check that min/max were properly truncated
4416        assert!(!stats.is_max_value_exact.unwrap());
4417        assert!(!stats.is_min_value_exact.unwrap());
4418        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4419        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4420
4421        // check second page now
4422        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4423        let mut prot = ThriftSliceInputProtocol::new(second_page);
4424        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4425        let stats = hdr.data_page_header.unwrap().statistics;
4426        assert!(stats.is_some());
4427        let stats = stats.unwrap();
4428        // check that min/max were properly truncated
4429        assert!(!stats.is_max_value_exact.unwrap());
4430        assert!(!stats.is_min_value_exact.unwrap());
4431        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4432        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4433    }
4434
4435    #[test]
4436    fn test_page_encoding_statistics_roundtrip() {
4437        let batch_schema = Schema::new(vec![Field::new(
4438            "int32",
4439            arrow_schema::DataType::Int32,
4440            false,
4441        )]);
4442
4443        let batch = RecordBatch::try_new(
4444            Arc::new(batch_schema.clone()),
4445            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4446        )
4447        .unwrap();
4448
4449        let mut file: File = tempfile::tempfile().unwrap();
4450        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4451        writer.write(&batch).unwrap();
4452        let file_metadata = writer.close().unwrap();
4453
4454        assert_eq!(file_metadata.num_row_groups(), 1);
4455        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4456        assert!(
4457            file_metadata
4458                .row_group(0)
4459                .column(0)
4460                .page_encoding_stats()
4461                .is_some()
4462        );
4463        let chunk_page_stats = file_metadata
4464            .row_group(0)
4465            .column(0)
4466            .page_encoding_stats()
4467            .unwrap();
4468
4469        // check that the read metadata is also correct
4470        let options = ReadOptionsBuilder::new()
4471            .with_page_index()
4472            .with_encoding_stats_as_mask(false)
4473            .build();
4474        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4475
4476        let rowgroup = reader.get_row_group(0).expect("row group missing");
4477        assert_eq!(rowgroup.num_columns(), 1);
4478        let column = rowgroup.metadata().column(0);
4479        assert!(column.page_encoding_stats().is_some());
4480        let file_page_stats = column.page_encoding_stats().unwrap();
4481        assert_eq!(chunk_page_stats, file_page_stats);
4482    }
4483
4484    #[test]
4485    fn test_different_dict_page_size_limit() {
4486        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4487        let schema = Arc::new(Schema::new(vec![
4488            Field::new("col0", arrow_schema::DataType::Int64, false),
4489            Field::new("col1", arrow_schema::DataType::Int64, false),
4490        ]));
4491        let batch =
4492            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4493
4494        let props = WriterProperties::builder()
4495            .set_dictionary_page_size_limit(1024 * 1024)
4496            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4497            .build();
4498        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4499        writer.write(&batch).unwrap();
4500        let data = Bytes::from(writer.into_inner().unwrap());
4501
4502        let mut metadata = ParquetMetaDataReader::new();
4503        metadata.try_parse(&data).unwrap();
4504        let metadata = metadata.finish().unwrap();
4505        let col0_meta = metadata.row_group(0).column(0);
4506        let col1_meta = metadata.row_group(0).column(1);
4507
4508        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4509            let mut reader =
4510                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4511            let page = reader.get_next_page().unwrap().unwrap();
4512            match page {
4513                Page::DictionaryPage { buf, .. } => buf.len(),
4514                _ => panic!("expected DictionaryPage"),
4515            }
4516        };
4517
4518        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4519        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4520    }
4521}