Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62    InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63    PageStoreFactory,
64};
65
66/// Encodes [`RecordBatch`] to parquet
67///
68/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
69/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
70/// flushed on close, leading the final row group in the output file to potentially
71/// contain fewer than `max_row_group_size` rows
72///
73/// # Example: Writing `RecordBatch`es
74/// ```
75/// # use std::sync::Arc;
76/// # use bytes::Bytes;
77/// # use arrow_array::{ArrayRef, Int64Array};
78/// # use arrow_array::RecordBatch;
79/// # use parquet::arrow::arrow_writer::ArrowWriter;
80/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
81/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
82/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
83///
84/// let mut buffer = Vec::new();
85/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
86/// writer.write(&to_write).unwrap();
87/// writer.close().unwrap();
88///
89/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
90/// let read = reader.next().unwrap().unwrap();
91///
92/// assert_eq!(to_write, read);
93/// ```
94///
95/// # Memory Usage and Limiting
96///
97/// The nature of Parquet requires buffering of an entire row group before it can
98/// be flushed to the underlying writer. Data is mostly buffered in its encoded
99/// form, reducing memory usage. However, some data such as dictionary keys,
100/// large strings or very nested data may still result in non-trivial memory
101/// usage.
102///
103/// See Also:
104/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
105/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
106///
107/// Call [`Self::flush`] to trigger an early flush of a row group based on a
108/// memory threshold and/or global memory pressure. However,  smaller row groups
109/// result in higher metadata overheads, and thus may worsen compression ratios
110/// and query performance.
111///
112/// ```no_run
113/// # use std::io::Write;
114/// # use arrow_array::RecordBatch;
115/// # use parquet::arrow::ArrowWriter;
116/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
117/// # let batch: RecordBatch = todo!();
118/// writer.write(&batch).unwrap();
119/// // Trigger an early flush if anticipated size exceeds 1_000_000
120/// if writer.in_progress_size() > 1_000_000 {
121///     writer.flush().unwrap();
122/// }
123/// ```
124///
125/// ## Type Support
126///
127/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
128/// Parquet types including  [`StructArray`] and [`ListArray`].
129///
130/// The following are not supported:
131///
132/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
133///
134/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
135/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
136/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
137/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
138/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
139///
140/// ## Type Compatibility
141/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
142/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
143/// value type.
144///
145/// For example, the following [`DataType`]s are all logically equivalent and can be written
146/// to the same column:
147/// * String, LargeString, StringView
148/// * Binary, LargeBinary, BinaryView
149///
150/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
151/// contain compatible values.
152/// ```
153/// # use std::sync::Arc;
154/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
155/// # use arrow_schema::{DataType, Field, Schema};
156/// # use parquet::arrow::arrow_writer::ArrowWriter;
157/// let record_batch1 = RecordBatch::try_new(
158///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
159///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
160///  )
161/// .unwrap();
162///
163/// let mut buffer = Vec::new();
164/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
165/// writer.write(&record_batch1).unwrap();
166///
167/// let record_batch2 = RecordBatch::try_new(
168///     Arc::new(Schema::new(vec![Field::new(
169///         "col",
170///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
171///          false,
172///     )])),
173///     vec![Arc::new(DictionaryArray::new(
174///          UInt8Array::from_iter_values(vec![0, 1]),
175///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
176///      ))],
177///  )
178///  .unwrap();
179///  writer.write(&record_batch2).unwrap();
180///  writer.close();
181/// ```
182pub struct ArrowWriter<W: Write> {
183    /// Underlying Parquet writer
184    writer: SerializedFileWriter<W>,
185
186    /// The in-progress row group if any
187    in_progress: Option<ArrowRowGroupWriter>,
188
189    /// A copy of the Arrow schema.
190    ///
191    /// The schema is used to verify that each record batch written has the correct schema
192    arrow_schema: SchemaRef,
193
194    /// Creates new [`ArrowRowGroupWriter`] instances as required
195    row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197    /// The maximum number of rows to write to each row group, or None for unlimited
198    max_row_group_row_count: Option<usize>,
199
200    /// The maximum size in bytes for a row group, or None for unlimited
201    max_row_group_bytes: Option<usize>,
202
203    /// CDC chunkers persisted across row groups (one per leaf column).
204    cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        let buffered_memory = self.in_progress_size();
210        f.debug_struct("ArrowWriter")
211            .field("writer", &self.writer)
212            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213            .field("in_progress_rows", &self.in_progress_rows())
214            .field("arrow_schema", &self.arrow_schema)
215            .field("max_row_group_row_count", &self.max_row_group_row_count)
216            .field("max_row_group_bytes", &self.max_row_group_bytes)
217            .finish()
218    }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222    /// Try to create a new Arrow writer
223    ///
224    /// The writer will fail if:
225    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
226    ///  * the Arrow schema contains unsupported datatypes such as Unions
227    pub fn try_new(
228        writer: W,
229        arrow_schema: SchemaRef,
230        props: Option<WriterProperties>,
231    ) -> Result<Self> {
232        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233        Self::try_new_with_options(writer, arrow_schema, options)
234    }
235
236    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
237    ///
238    /// The writer will fail if:
239    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
240    ///  * the Arrow schema contains unsupported datatypes such as Unions
241    pub fn try_new_with_options(
242        writer: W,
243        arrow_schema: SchemaRef,
244        options: ArrowWriterOptions,
245    ) -> Result<Self> {
246        let mut props = options.properties;
247
248        let schema = if let Some(parquet_schema) = options.schema_descr {
249            parquet_schema.clone()
250        } else {
251            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252            if let Some(schema_root) = &options.schema_root {
253                converter = converter.schema_root(schema_root);
254            }
255
256            converter.convert(&arrow_schema)?
257        };
258
259        if !options.skip_arrow_metadata {
260            // add serialized arrow schema
261            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262        }
263
264        let max_row_group_row_count = props.max_row_group_row_count();
265        let max_row_group_bytes = props.max_row_group_bytes();
266
267        let props_ptr = Arc::new(props);
268        let file_writer =
269            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271        let mut row_group_writer_factory =
272            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273        if let Some(page_store_factory) = options.page_store_factory {
274            row_group_writer_factory =
275                row_group_writer_factory.with_page_store_factory(page_store_factory);
276        }
277
278        let cdc_chunkers = props_ptr
279            .content_defined_chunking()
280            .map(|opts| {
281                file_writer
282                    .schema_descr()
283                    .columns()
284                    .iter()
285                    .map(|desc| ContentDefinedChunker::new(desc, opts))
286                    .collect::<Result<Vec<_>>>()
287            })
288            .transpose()?;
289
290        Ok(Self {
291            writer: file_writer,
292            in_progress: None,
293            arrow_schema,
294            row_group_writer_factory,
295            max_row_group_row_count,
296            max_row_group_bytes,
297            cdc_chunkers,
298        })
299    }
300
301    /// Returns metadata for any flushed row groups
302    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303        self.writer.flushed_row_groups()
304    }
305
306    /// Estimated memory usage, in bytes, of this `ArrowWriter`
307    ///
308    /// This estimate is formed bu summing the values of
309    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
310    pub fn memory_size(&self) -> usize {
311        match &self.in_progress {
312            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313            None => 0,
314        }
315    }
316
317    /// Anticipated encoded size of the in progress row group.
318    ///
319    /// This estimate the row group size after being completely encoded is,
320    /// formed by summing the values of
321    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
322    /// columns.
323    pub fn in_progress_size(&self) -> usize {
324        match &self.in_progress {
325            Some(in_progress) => in_progress
326                .writers
327                .iter()
328                .map(|x| x.get_estimated_total_bytes())
329                .sum(),
330            None => 0,
331        }
332    }
333
334    /// Returns the number of rows buffered in the in progress row group
335    pub fn in_progress_rows(&self) -> usize {
336        self.in_progress
337            .as_ref()
338            .map(|x| x.buffered_rows)
339            .unwrap_or_default()
340    }
341
342    /// Returns the number of bytes written by this instance
343    pub fn bytes_written(&self) -> usize {
344        self.writer.bytes_written()
345    }
346
347    /// Encodes the provided [`RecordBatch`]
348    ///
349    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]
350    /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be
351    /// written to one or more row groups such that limits are respected.
352    ///
353    /// If both limits are `None`, all data is written to a single row group.
354    /// If one limit is set, that limit is respected.
355    /// If both limits are set, the lower bound (whichever triggers first) is respected.
356    ///
357    /// This will fail if the `batch`'s schema does not match the writer's schema.
358    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359        if batch.num_rows() == 0 {
360            return Ok(());
361        }
362
363        let in_progress = match &mut self.in_progress {
364            Some(in_progress) => in_progress,
365            x => x.insert(
366                self.row_group_writer_factory
367                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368            ),
369        };
370
371        if let Some(max_rows) = self.max_row_group_row_count {
372            if in_progress.buffered_rows + batch.num_rows() > max_rows {
373                let to_write = max_rows - in_progress.buffered_rows;
374                let a = batch.slice(0, to_write);
375                let b = batch.slice(to_write, batch.num_rows() - to_write);
376                self.write(&a)?;
377                return self.write(&b);
378            }
379        }
380
381        // Check byte limit: if we have buffered data, use measured average row size
382        // to split batch proactively before exceeding byte limit
383        if let Some(max_bytes) = self.max_row_group_bytes {
384            if in_progress.buffered_rows > 0 {
385                let current_bytes = in_progress.get_estimated_total_bytes();
386
387                if current_bytes >= max_bytes {
388                    self.flush()?;
389                    return self.write(batch);
390                }
391
392                if let Some(avg_row_bytes) = current_bytes
393                    .checked_div(in_progress.buffered_rows)
394                    .filter(|avg_row_bytes| *avg_row_bytes > 0)
395                {
396                    // At this point, `current_bytes < max_bytes` (checked above)
397                    let remaining_bytes = max_bytes - current_bytes;
398                    let rows_that_fit = remaining_bytes.checked_div(avg_row_bytes).unwrap_or(0);
399
400                    if batch.num_rows() > rows_that_fit {
401                        if rows_that_fit > 0 {
402                            let a = batch.slice(0, rows_that_fit);
403                            let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
404                            self.write(&a)?;
405                            return self.write(&b);
406                        } else {
407                            self.flush()?;
408                            return self.write(batch);
409                        }
410                    }
411                }
412            }
413        }
414
415        match self.cdc_chunkers.as_mut() {
416            Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
417            None => in_progress.write(batch)?,
418        }
419
420        let should_flush = self
421            .max_row_group_row_count
422            .is_some_and(|max| in_progress.buffered_rows >= max)
423            || self
424                .max_row_group_bytes
425                .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
426
427        if should_flush {
428            self.flush()?
429        }
430        Ok(())
431    }
432
433    /// Writes the given buf bytes to the internal buffer.
434    ///
435    /// It's safe to use this method to write data to the underlying writer,
436    /// because it will ensure that the buffering and byte‐counting layers are used.
437    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
438        self.writer.write_all(buf)
439    }
440
441    /// Flushes underlying writer
442    pub fn sync(&mut self) -> std::io::Result<()> {
443        self.writer.flush()
444    }
445
446    /// Flushes all buffered rows into a new row group
447    ///
448    /// Note the underlying writer is not flushed with this call.
449    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
450    pub fn flush(&mut self) -> Result<()> {
451        let in_progress = match self.in_progress.take() {
452            Some(in_progress) => in_progress,
453            None => return Ok(()),
454        };
455
456        let mut row_group_writer = self.writer.next_row_group()?;
457        for chunk in in_progress.close()? {
458            chunk.append_to_row_group(&mut row_group_writer)?;
459        }
460        row_group_writer.close()?;
461        Ok(())
462    }
463
464    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
465    ///
466    /// This method provide a way to append kv_metadata after write RecordBatch
467    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
468        self.writer.append_key_value_metadata(kv_metadata)
469    }
470
471    /// Returns a reference to the underlying writer.
472    pub fn inner(&self) -> &W {
473        self.writer.inner()
474    }
475
476    /// Returns a mutable reference to the underlying writer.
477    ///
478    /// **Warning**: if you write directly to this writer, you will skip
479    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
480    /// the file footer’s recorded offsets and sizes to diverge from reality,
481    /// resulting in an unreadable or corrupted Parquet file.
482    ///
483    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
484    pub fn inner_mut(&mut self) -> &mut W {
485        self.writer.inner_mut()
486    }
487
488    /// Flushes any outstanding data and returns the underlying writer.
489    pub fn into_inner(mut self) -> Result<W> {
490        self.flush()?;
491        self.writer.into_inner()
492    }
493
494    /// Close and finalize the underlying Parquet writer
495    ///
496    /// Unlike [`Self::close`] this does not consume self
497    ///
498    /// Attempting to write after calling finish will result in an error
499    pub fn finish(&mut self) -> Result<ParquetMetaData> {
500        self.flush()?;
501        self.writer.finish()
502    }
503
504    /// Close and finalize the underlying Parquet writer
505    pub fn close(mut self) -> Result<ParquetMetaData> {
506        self.finish()
507    }
508
509    /// Create a new row group writer and return its column writers.
510    #[deprecated(
511        since = "56.2.0",
512        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
513    )]
514    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
515        self.flush()?;
516        let in_progress = self
517            .row_group_writer_factory
518            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
519        Ok(in_progress.writers)
520    }
521
522    /// Append the given column chunks to the file as a new row group.
523    #[deprecated(
524        since = "56.2.0",
525        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
526    )]
527    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
528        let mut row_group_writer = self.writer.next_row_group()?;
529        for chunk in chunks {
530            chunk.append_to_row_group(&mut row_group_writer)?;
531        }
532        row_group_writer.close()?;
533        Ok(())
534    }
535
536    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
537    ///
538    /// Flushes any outstanding data before returning.
539    ///
540    /// This can be useful to provide more control over how files are written, for example
541    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
542    pub fn into_serialized_writer(
543        mut self,
544    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
545        self.flush()?;
546        Ok((self.writer, self.row_group_writer_factory))
547    }
548}
549
550impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
551    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
552        self.write(batch).map_err(|e| e.into())
553    }
554
555    fn close(self) -> std::result::Result<(), ArrowError> {
556        self.close()?;
557        Ok(())
558    }
559}
560
561/// Arrow-specific configuration settings for writing parquet files.
562///
563/// See [`ArrowWriter`] for how to configure the writer.
564#[derive(Debug, Clone, Default)]
565pub struct ArrowWriterOptions {
566    properties: WriterProperties,
567    skip_arrow_metadata: bool,
568    schema_root: Option<String>,
569    schema_descr: Option<SchemaDescriptor>,
570    page_store_factory: Option<Arc<dyn PageStoreFactory>>,
571}
572
573impl ArrowWriterOptions {
574    /// Creates a new [`ArrowWriterOptions`] with the default settings.
575    pub fn new() -> Self {
576        Self::default()
577    }
578
579    /// Sets the [`WriterProperties`] for writing parquet files.
580    pub fn with_properties(self, properties: WriterProperties) -> Self {
581        Self { properties, ..self }
582    }
583
584    /// Sets the [`PageStoreFactory`] used to buffer completed pages while a row
585    /// group is being written.
586    ///
587    /// The default implementation ([`InMemoryPageStore`]) buffers all completed
588    /// pages on the heap until the row group is flushed, so peak write memory
589    /// grows with the row group size. Using this API, pages can be spilled to a
590    /// file or object storage instead, reducing peak write memory substantially
591    /// at the expense of an extra write to and read from secondary storage.
592    ///
593    /// # Example: spilling pages to a temp file
594    ///
595    /// A simple spilling backend uses one temp file per column chunk; `put`
596    /// appends the page and `take` reads it back.
597    ///
598    /// ```
599    /// # use std::fs::File;
600    /// # use std::io::{Read, Seek, SeekFrom, Write};
601    /// # use std::sync::Arc;
602    /// # use bytes::Bytes;
603    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
604    /// # use parquet::arrow::arrow_writer::{
605    /// #     ArrowWriter, ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory,
606    /// # };
607    /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
608    /// # use parquet::errors::Result;
609    /// struct TempFilePageStore {
610    ///     file: File,
611    ///     /// Total size of the file
612    ///     end: u64,
613    ///     /// Location of pages: (offset, len)
614    ///     locs: Vec<(u64, usize)>,
615    /// }
616    ///
617    /// impl PageStore for TempFilePageStore {
618    ///     fn put(&mut self, value: Bytes) -> Result<PageKey> {
619    ///         // Append to the end of the file
620    ///         self.file.seek(SeekFrom::Start(self.end))?;
621    ///         self.file.write_all(&value)?;
622    ///         let key = PageKey::new(self.locs.len() as u64);
623    ///         self.locs.push((self.end, value.len()));
624    ///         self.end += value.len() as u64;
625    ///         Ok(key)
626    ///     }
627    ///
628    ///     fn take(&mut self, key: PageKey) -> Result<Bytes> {
629    ///         let (offset, len) = self.locs[key.get() as usize];
630    ///         let mut buf = vec![0u8; len];
631    ///         self.file.seek(SeekFrom::Start(offset))?;
632    ///         self.file.read_exact(&mut buf)?;
633    ///         Ok(Bytes::from(buf))
634    ///     }
635    /// }
636    ///
637    /// /// Factory for creating [`TempFilePageStore`]
638    /// #[derive(Debug)]
639    /// struct TempFilePageStoreFactory;
640    ///
641    /// impl PageStoreFactory for TempFilePageStoreFactory {
642    ///     fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
643    ///         // `args` exposes the column index and descriptor (physical/logical
644    ///         // type, path), so a real backend might choose to spill only large columns.
645    ///         let _ = (args.column_index(), args.column_descriptor());
646    ///         Ok(Box::new(TempFilePageStore {
647    ///             file: tempfile::tempfile()?, // temp file is cleaned on drop
648    ///             end: 0,
649    ///             locs: Vec::new(),
650    ///         }))
651    ///     }
652    /// }
653    /// // write 1000 integers
654    /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef;
655    /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
656    ///
657    /// let options =
658    ///     ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory));
659    /// let mut buffer = Vec::new();
660    /// let mut writer =
661    ///     ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), options).unwrap();
662    /// writer.write(&to_write).unwrap();
663    /// writer.close().unwrap();
664    ///
665    /// // buffer now holds valid Parquet data, which can be read as normal:
666    /// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
667    /// assert_eq!(to_write, reader.next().unwrap().unwrap());
668    /// ```
669    pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
670        Self {
671            page_store_factory: Some(page_store_factory),
672            ..self
673        }
674    }
675
676    /// Skip encoding the embedded arrow metadata (defaults to `false`)
677    ///
678    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
679    /// by default.
680    ///
681    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
682    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
683        Self {
684            skip_arrow_metadata,
685            ..self
686        }
687    }
688
689    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
690    pub fn with_schema_root(self, schema_root: String) -> Self {
691        Self {
692            schema_root: Some(schema_root),
693            ..self
694        }
695    }
696
697    /// Explicitly specify the Parquet schema to be used
698    ///
699    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
700    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
701    /// already known or must be calculated using custom logic.
702    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
703        Self {
704            schema_descr: Some(schema_descr),
705            ..self
706        }
707    }
708}
709
710/// A single column chunk produced by [`ArrowColumnWriter`].
711///
712/// Holds the serialized page blobs (each page's header ‖ compressed data, in
713/// write order) in a [`PageStore`], plus the handles needed to read them back,
714/// in order, when the chunk is spliced into the output file.
715struct ArrowColumnChunkData {
716    length: usize,
717    store: Box<dyn PageStore>,
718    keys: Vec<PageKey>,
719    /// Handles to the dictionary page's blobs (header then data) in the store.
720    ///
721    /// A dictionary page is produced at most once and bounded by
722    /// `dict_page_size_limit`, but it must be written *first* in the chunk even
723    /// though the data pages reach the writer before it (see
724    /// [`PageWriter::defers_dictionary_ordering`]). Its header and data are `put`
725    /// into the store like any other page — which keeps the store uniform, and
726    /// lets an oversized dictionary page spill — and their handles are held apart
727    /// so they can be emitted ahead of the data pages at splice.
728    /// Empty for non-dictionary columns.
729    dictionary_keys: Vec<PageKey>,
730    /// Serialized length of the dictionary page (0 if there is none), recorded
731    /// so the data pages can be shifted past it when offsets are rewritten to a
732    /// dictionary-first layout at splice.
733    dictionary_len: usize,
734}
735
736impl ArrowColumnChunkData {
737    fn new(store: Box<dyn PageStore>) -> Self {
738        Self {
739            length: 0,
740            store,
741            keys: Vec::new(),
742            dictionary_keys: Vec::new(),
743            dictionary_len: 0,
744        }
745    }
746
747    /// Append a data-page blob to the store, recording its handle in write
748    /// order.
749    fn push(&mut self, value: Bytes) -> Result<()> {
750        let key = self.store.put(value)?;
751        self.keys.push(key);
752        Ok(())
753    }
754
755    /// Store a dictionary-page blob (header or data) in the page store,
756    /// recording its handle (emitted first at splice) and accumulating its
757    /// serialized length.
758    fn push_dictionary(&mut self, value: Bytes) -> Result<()> {
759        self.dictionary_len += value.len();
760        let key = self.store.put(value)?;
761        self.dictionary_keys.push(key);
762        Ok(())
763    }
764
765    /// Bytes this chunk currently holds on the heap: whatever the store keeps
766    /// resident (zero for a spilling backend).
767    fn memory_size(&self) -> usize {
768        self.store.memory_size()
769    }
770}
771
772/// A streaming [`Read`] over one column chunk's buffered pages, in final file
773/// order: the dictionary page (if any) first, then the data pages.
774///
775/// Each blob is taken back out of the [`PageStore`] *as it is
776/// consumed* and released immediately afterwards, so splicing a chunk into the
777/// output file never materializes more than a single page in memory at a time.
778/// This is what keeps the splice phase within the memory bound for a spilling
779/// backend (an in-memory store already holds the bytes, so it is unaffected).
780struct StreamingColumnChunkReader {
781    store: Box<dyn PageStore>,
782    /// Page handles in final file order: the dictionary page first (if any),
783    /// then the data pages.
784    keys: IntoIter<PageKey>,
785    /// The blob currently being drained into the output; emptied as it is read.
786    current: Bytes,
787}
788
789impl StreamingColumnChunkReader {
790    fn new(data: ArrowColumnChunkData) -> Self {
791        // The dictionary page must be emitted first, ahead of the data pages,
792        // even though it was the last page produced.
793        let keys = if data.dictionary_keys.is_empty() {
794            data.keys
795        } else {
796            let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len());
797            keys.extend(data.dictionary_keys);
798            keys.extend(data.keys);
799            keys
800        };
801        Self {
802            store: data.store,
803            keys: keys.into_iter(),
804            current: Bytes::new(),
805        }
806    }
807}
808
809impl Read for StreamingColumnChunkReader {
810    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
811        // Refill from the next blob whenever the current one is drained: the
812        // dictionary page first, then each data page, all taken from the store.
813        while self.current.is_empty() {
814            if let Some(key) = self.keys.next() {
815                self.current = self.store.take(key).map_err(std::io::Error::other)?;
816            } else {
817                return Ok(0);
818            }
819        }
820
821        let len = self.current.len().min(out.len());
822        let b = self.current.split_to(len);
823        out[..len].copy_from_slice(&b);
824        Ok(len)
825    }
826}
827
828/// A shared [`ArrowColumnChunkData`]
829///
830/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
831/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
832type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
833
834struct ArrowPageWriter {
835    buffer: SharedColumnChunk,
836    #[cfg(feature = "encryption")]
837    page_encryptor: Option<PageEncryptor>,
838}
839
840impl ArrowPageWriter {
841    /// Create a page writer that buffers completed pages in `store`.
842    fn new(store: Box<dyn PageStore>) -> Self {
843        Self {
844            buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
845            #[cfg(feature = "encryption")]
846            page_encryptor: None,
847        }
848    }
849
850    #[cfg(feature = "encryption")]
851    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
852        self.page_encryptor = page_encryptor;
853        self
854    }
855
856    #[cfg(feature = "encryption")]
857    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
858        self.page_encryptor.as_mut()
859    }
860
861    #[cfg(not(feature = "encryption"))]
862    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
863        None
864    }
865}
866
867impl PageWriter for ArrowPageWriter {
868    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
869        let page = match self.page_encryptor_mut() {
870            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
871            None => page,
872        };
873
874        let page_header = page.to_thrift_header()?;
875        let header = {
876            let mut header = Vec::with_capacity(1024);
877
878            match self.page_encryptor_mut() {
879                Some(page_encryptor) => {
880                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
881                    if page.compressed_page().is_data_page() {
882                        page_encryptor.increment_page();
883                    }
884                }
885                None => {
886                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
887                    page_header.write_thrift(&mut protocol)?;
888                }
889            };
890
891            Bytes::from(header)
892        };
893
894        let mut buf = self.buffer.try_lock().unwrap();
895
896        let data = page.compressed_page().buffer().clone();
897        let compressed_size = data.len() + header.len();
898
899        let mut spec = PageWriteSpec::new();
900        spec.page_type = page.page_type();
901        spec.num_values = page.num_values();
902        spec.uncompressed_size = page.uncompressed_size() + header.len();
903        spec.offset = buf.length as u64;
904        spec.compressed_size = compressed_size;
905        spec.bytes_written = compressed_size as u64;
906
907        buf.length += compressed_size;
908        if spec.page_type == PageType::DICTIONARY_PAGE {
909            // Recorded apart from the data pages so it is emitted first at
910            // splice — see `ArrowColumnChunkData::dictionary_keys`.
911            buf.push_dictionary(header)?;
912            buf.push_dictionary(data)?;
913        } else {
914            buf.push(header)?;
915            buf.push(data)?;
916        }
917
918        Ok(spec)
919    }
920
921    fn defers_dictionary_ordering(&self) -> bool {
922        // The Arrow chunk is buffered in full and spliced at row-group flush, so
923        // data pages may be accepted before the dictionary page and reordered
924        // then. This lets `GenericColumnWriter` stream dictionary-column data
925        // pages straight through instead of buffering them in memory.
926        true
927    }
928
929    fn buffered_memory_size(&self) -> usize {
930        // Only what is actually resident: a spilling store reports ~0 here even
931        // though the chunk's bytes have all passed through it.
932        self.buffer.try_lock().unwrap().memory_size()
933    }
934
935    fn close(&mut self) -> Result<()> {
936        Ok(())
937    }
938}
939
940/// A leaf column that can be encoded by [`ArrowColumnWriter`]
941#[derive(Debug)]
942pub struct ArrowLeafColumn(ArrayLevels);
943
944/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
945///
946/// This function can be used along with [`get_column_writers`] to encode
947/// individual columns in parallel. See example on [`ArrowColumnWriter`]
948pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
949    let levels = calculate_array_levels(array, field)?;
950    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
951}
952
953/// The data for a single column chunk, see [`ArrowColumnWriter`]
954pub struct ArrowColumnChunk {
955    data: ArrowColumnChunkData,
956    close: ColumnCloseResult,
957}
958
959impl std::fmt::Debug for ArrowColumnChunk {
960    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
961        f.debug_struct("ArrowColumnChunk")
962            .field("length", &self.data.length)
963            .finish_non_exhaustive()
964    }
965}
966
967impl ArrowColumnChunk {
968    /// Returns the [`ColumnCloseResult`] produced when the chunk was closed.
969    ///
970    /// Exposes encoding information, collected statistics, and the optional
971    /// [`ColumnIndexMetaData`](crate::file::page_index::column_index::ColumnIndexMetaData)
972    /// / [`OffsetIndexMetaData`](crate::file::page_index::offset_index::OffsetIndexMetaData)
973    /// gathered for the column chunk.
974    pub fn close(&self) -> &ColumnCloseResult {
975        &self.close
976    }
977
978    /// Returns a mutable reference to the [`ColumnCloseResult`].
979    ///
980    /// This allows callers to mutate the close result before the chunk is
981    /// appended to a row group — for example, clearing `column_index` or
982    /// `bloom_filter` based on a dynamic rule that inspects the encodings and
983    /// collected page statistics.
984    pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
985        &mut self.close
986    }
987
988    /// Splices this column's buffered pages into the row group, streaming them
989    /// back out of the [`PageStore`] one page at a time.
990    pub fn append_to_row_group<W: Write + Send>(
991        self,
992        writer: &mut SerializedRowGroupWriter<'_, W>,
993    ) -> Result<()> {
994        let ArrowColumnChunk { data, close } = self;
995
996        // The dictionary page is produced *after* the data pages on this path (so
997        // they can stream straight through) but must be written *first*, so move
998        // it ahead of the data pages in the recorded offsets before the splice.
999        let close = close.update_dictionary_location(data.dictionary_len)?;
1000
1001        let reader = StreamingColumnChunkReader::new(data);
1002        writer.append_column_from_read(reader, close)
1003    }
1004}
1005
1006/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
1007///
1008/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
1009///
1010/// Note: This is a low-level interface for applications that require
1011/// fine-grained control of encoding (e.g. encoding using multiple threads),
1012/// see [`ArrowWriter`] for a higher-level interface
1013///
1014/// # Example: Encoding two Arrow Array's in Parallel
1015/// ```
1016/// // The arrow schema
1017/// # use std::sync::Arc;
1018/// # use arrow_array::*;
1019/// # use arrow_schema::*;
1020/// # use parquet::arrow::ArrowSchemaConverter;
1021/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
1022/// # use parquet::file::properties::WriterProperties;
1023/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1024/// #
1025/// let schema = Arc::new(Schema::new(vec![
1026///     Field::new("i32", DataType::Int32, false),
1027///     Field::new("f32", DataType::Float32, false),
1028/// ]));
1029///
1030/// // Compute the parquet schema
1031/// let props = Arc::new(WriterProperties::default());
1032/// let parquet_schema = ArrowSchemaConverter::new()
1033///   .with_coerce_types(props.coerce_types())
1034///   .convert(&schema)
1035///   .unwrap();
1036///
1037/// // Create parquet writer
1038/// let root_schema = parquet_schema.root_schema_ptr();
1039/// // write to memory in the example, but this could be a File
1040/// let mut out = Vec::with_capacity(1024);
1041/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
1042///   .unwrap();
1043///
1044/// // Create a factory for building Arrow column writers
1045/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
1046/// // Create column writers for the 0th row group
1047/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
1048///
1049/// // Spawn a worker thread for each column
1050/// //
1051/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
1052/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
1053/// let mut workers: Vec<_> = col_writers
1054///     .into_iter()
1055///     .map(|mut col_writer| {
1056///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1057///         let handle = std::thread::spawn(move || {
1058///             // receive Arrays to encode via the channel
1059///             for col in recv {
1060///                 col_writer.write(&col)?;
1061///             }
1062///             // once the input is complete, close the writer
1063///             // to return the newly created ArrowColumnChunk
1064///             col_writer.close()
1065///         });
1066///         (handle, send)
1067///     })
1068///     .collect();
1069///
1070/// // Start row group
1071/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
1072///   .next_row_group()
1073///   .unwrap();
1074///
1075/// // Create some example input columns to encode
1076/// let to_write = vec![
1077///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
1078///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
1079/// ];
1080///
1081/// // Send the input columns to the workers
1082/// let mut worker_iter = workers.iter_mut();
1083/// for (arr, field) in to_write.iter().zip(&schema.fields) {
1084///     for leaves in compute_leaves(field, arr).unwrap() {
1085///         worker_iter.next().unwrap().1.send(leaves).unwrap();
1086///     }
1087/// }
1088///
1089/// // Wait for the workers to complete encoding, and append
1090/// // the resulting column chunks to the row group (and the file)
1091/// for (handle, send) in workers {
1092///     drop(send); // Drop send side to signal termination
1093///     // wait for the worker to send the completed chunk
1094///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1095///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
1096/// }
1097/// // Close the row group which writes to the underlying file
1098/// row_group_writer.close().unwrap();
1099///
1100/// let metadata = writer.close().unwrap();
1101/// assert_eq!(metadata.file_metadata().num_rows(), 3);
1102/// ```
1103pub struct ArrowColumnWriter {
1104    writer: ArrowColumnWriterImpl,
1105    chunk: SharedColumnChunk,
1106}
1107
1108impl std::fmt::Debug for ArrowColumnWriter {
1109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1110        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1111    }
1112}
1113
1114enum ArrowColumnWriterImpl {
1115    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1116    Column(ColumnWriter<'static>),
1117}
1118
1119impl ArrowColumnWriter {
1120    /// Write an [`ArrowLeafColumn`]
1121    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1122        self.write_internal(&col.0)
1123    }
1124
1125    /// Write with content-defined chunking, inserting page flushes at chunk boundaries.
1126    fn write_with_chunker(
1127        &mut self,
1128        col: &ArrowLeafColumn,
1129        chunker: &mut ContentDefinedChunker,
1130    ) -> Result<()> {
1131        let levels = &col.0;
1132        let chunks = chunker.get_arrow_chunks(
1133            levels.def_level_data().as_ref(),
1134            levels.rep_level_data().as_ref(),
1135            levels.array(),
1136        )?;
1137
1138        let num_chunks = chunks.len();
1139        for (i, chunk) in chunks.iter().enumerate() {
1140            let chunk_levels = levels.slice_for_chunk(chunk);
1141            self.write_internal(&chunk_levels)?;
1142
1143            // Add a page break after each chunk except the last
1144            if i + 1 < num_chunks {
1145                match &mut self.writer {
1146                    ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1147                    ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1148                }
1149            }
1150        }
1151        Ok(())
1152    }
1153
1154    fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1155        match &mut self.writer {
1156            ArrowColumnWriterImpl::Column(c) => {
1157                let leaf = levels.array();
1158                match leaf.as_any_dictionary_opt() {
1159                    Some(dictionary) => {
1160                        let materialized =
1161                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1162                        write_leaf(c, &materialized, levels)?
1163                    }
1164                    None => write_leaf(c, leaf, levels)?,
1165                };
1166            }
1167            ArrowColumnWriterImpl::ByteArray(c) => {
1168                write_primitive(c, levels.array().as_ref(), levels)?;
1169            }
1170        }
1171        Ok(())
1172    }
1173
1174    /// Close this column returning the written [`ArrowColumnChunk`]
1175    pub fn close(self) -> Result<ArrowColumnChunk> {
1176        let close = match self.writer {
1177            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1178            ArrowColumnWriterImpl::Column(c) => c.close()?,
1179        };
1180        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1181        let data = chunk.into_inner().unwrap();
1182        Ok(ArrowColumnChunk { data, close })
1183    }
1184
1185    /// Returns the estimated total memory usage by the writer.
1186    ///
1187    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
1188    /// of the current memory usage and not it's anticipated encoded size.
1189    ///
1190    /// This includes:
1191    /// 1. Data buffered in encoded form
1192    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
1193    ///
1194    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
1195    pub fn memory_size(&self) -> usize {
1196        match &self.writer {
1197            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1198            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1199        }
1200    }
1201
1202    /// Returns the estimated total encoded bytes for this column writer.
1203    ///
1204    /// This includes:
1205    /// 1. Data buffered in encoded form
1206    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
1207    ///
1208    /// This value should be less than or equal to [`Self::memory_size`]
1209    pub fn get_estimated_total_bytes(&self) -> usize {
1210        match &self.writer {
1211            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1212            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1213        }
1214    }
1215}
1216
1217/// Encodes [`RecordBatch`] to a parquet row group
1218///
1219/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
1220/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
1221///
1222/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1223#[derive(Debug)]
1224struct ArrowRowGroupWriter {
1225    writers: Vec<ArrowColumnWriter>,
1226    schema: SchemaRef,
1227    buffered_rows: usize,
1228}
1229
1230impl ArrowRowGroupWriter {
1231    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1232        Self {
1233            writers,
1234            schema: arrow.clone(),
1235            buffered_rows: 0,
1236        }
1237    }
1238
1239    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1240        self.buffered_rows += batch.num_rows();
1241        let mut writers = self.writers.iter_mut();
1242        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1243            for leaf in compute_leaves(field.as_ref(), column)? {
1244                writers.next().unwrap().write(&leaf)?;
1245            }
1246        }
1247        Ok(())
1248    }
1249
1250    fn write_with_chunkers(
1251        &mut self,
1252        batch: &RecordBatch,
1253        chunkers: &mut [ContentDefinedChunker],
1254    ) -> Result<()> {
1255        self.buffered_rows += batch.num_rows();
1256        let mut writers = self.writers.iter_mut();
1257        let mut chunkers = chunkers.iter_mut();
1258        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1259            for leaf in compute_leaves(field.as_ref(), column)? {
1260                writers
1261                    .next()
1262                    .unwrap()
1263                    .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1264            }
1265        }
1266        Ok(())
1267    }
1268
1269    /// Returns the estimated total encoded bytes for this row group
1270    fn get_estimated_total_bytes(&self) -> usize {
1271        self.writers
1272            .iter()
1273            .map(|x| x.get_estimated_total_bytes())
1274            .sum()
1275    }
1276
1277    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1278        self.writers
1279            .into_iter()
1280            .map(|writer| writer.close())
1281            .collect()
1282    }
1283}
1284
1285/// Factory that creates new column writers for each row group in the Parquet file.
1286///
1287/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
1288/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1289#[derive(Debug)]
1290pub struct ArrowRowGroupWriterFactory {
1291    schema: SchemaDescPtr,
1292    arrow_schema: SchemaRef,
1293    props: WriterPropertiesPtr,
1294    page_store_factory: Arc<dyn PageStoreFactory>,
1295    #[cfg(feature = "encryption")]
1296    file_encryptor: Option<Arc<FileEncryptor>>,
1297}
1298
1299impl ArrowRowGroupWriterFactory {
1300    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
1301    pub fn new<W: Write + Send>(
1302        file_writer: &SerializedFileWriter<W>,
1303        arrow_schema: SchemaRef,
1304    ) -> Self {
1305        let schema = Arc::clone(file_writer.schema_descr_ptr());
1306        let props = Arc::clone(file_writer.properties());
1307        Self {
1308            schema,
1309            arrow_schema,
1310            props,
1311            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1312            #[cfg(feature = "encryption")]
1313            file_encryptor: file_writer.file_encryptor(),
1314        }
1315    }
1316
1317    /// Set the [`PageStoreFactory`] used to allocate the buffer for each column
1318    /// chunk, e.g. to spill completed pages to a temp file or object storage
1319    /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`].
1320    pub fn with_page_store_factory(
1321        mut self,
1322        page_store_factory: Arc<dyn PageStoreFactory>,
1323    ) -> Self {
1324        self.page_store_factory = page_store_factory;
1325        self
1326    }
1327
1328    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1329        let writers = self.create_column_writers(row_group_index)?;
1330        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1331    }
1332
1333    /// Create column writers for a new row group, with the given row group index
1334    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1335        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1336        let mut leaves = self.schema.columns().iter();
1337        let column_factory = self.column_writer_factory(row_group_index);
1338        for field in &self.arrow_schema.fields {
1339            column_factory.get_arrow_column_writer(
1340                field.data_type(),
1341                &self.props,
1342                &mut leaves,
1343                &mut writers,
1344            )?;
1345        }
1346        Ok(writers)
1347    }
1348
1349    #[cfg(feature = "encryption")]
1350    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1351        ArrowColumnWriterFactory::new()
1352            .with_page_store_factory(self.page_store_factory.clone())
1353            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1354    }
1355
1356    #[cfg(not(feature = "encryption"))]
1357    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1358        ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1359    }
1360}
1361
1362/// Returns [`ArrowColumnWriter`]s for each column in a given schema
1363#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1364pub fn get_column_writers(
1365    parquet: &SchemaDescriptor,
1366    props: &WriterPropertiesPtr,
1367    arrow: &SchemaRef,
1368) -> Result<Vec<ArrowColumnWriter>> {
1369    let mut writers = Vec::with_capacity(arrow.fields.len());
1370    let mut leaves = parquet.columns().iter();
1371    let column_factory = ArrowColumnWriterFactory::new();
1372    for field in &arrow.fields {
1373        column_factory.get_arrow_column_writer(
1374            field.data_type(),
1375            props,
1376            &mut leaves,
1377            &mut writers,
1378        )?;
1379    }
1380    Ok(writers)
1381}
1382
1383/// Creates [`ArrowColumnWriter`] instances
1384struct ArrowColumnWriterFactory {
1385    /// Allocates the per-column-chunk [`PageStore`] backing each page writer.
1386    page_store_factory: Arc<dyn PageStoreFactory>,
1387    #[cfg(feature = "encryption")]
1388    row_group_index: usize,
1389    #[cfg(feature = "encryption")]
1390    file_encryptor: Option<Arc<FileEncryptor>>,
1391}
1392
1393impl ArrowColumnWriterFactory {
1394    pub fn new() -> Self {
1395        Self {
1396            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1397            #[cfg(feature = "encryption")]
1398            row_group_index: 0,
1399            #[cfg(feature = "encryption")]
1400            file_encryptor: None,
1401        }
1402    }
1403
1404    /// Use `page_store_factory` to allocate the buffer for each column chunk.
1405    pub fn with_page_store_factory(
1406        mut self,
1407        page_store_factory: Arc<dyn PageStoreFactory>,
1408    ) -> Self {
1409        self.page_store_factory = page_store_factory;
1410        self
1411    }
1412
1413    #[cfg(feature = "encryption")]
1414    pub fn with_file_encryptor(
1415        mut self,
1416        row_group_index: usize,
1417        file_encryptor: Option<Arc<FileEncryptor>>,
1418    ) -> Self {
1419        self.row_group_index = row_group_index;
1420        self.file_encryptor = file_encryptor;
1421        self
1422    }
1423
1424    #[cfg(feature = "encryption")]
1425    fn create_page_writer(
1426        &self,
1427        column_descriptor: &ColumnDescPtr,
1428        column_index: usize,
1429    ) -> Result<Box<ArrowPageWriter>> {
1430        let column_path = column_descriptor.path().string();
1431        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1432            &self.file_encryptor,
1433            self.row_group_index,
1434            column_index,
1435            &column_path,
1436        )?;
1437        let args = PageStoreArgs::new(column_index, column_descriptor);
1438        let store = self.page_store_factory.create(&args)?;
1439        Ok(Box::new(
1440            ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1441        ))
1442    }
1443
1444    #[cfg(not(feature = "encryption"))]
1445    fn create_page_writer(
1446        &self,
1447        column_descriptor: &ColumnDescPtr,
1448        column_index: usize,
1449    ) -> Result<Box<ArrowPageWriter>> {
1450        let args = PageStoreArgs::new(column_index, column_descriptor);
1451        let store = self.page_store_factory.create(&args)?;
1452        Ok(Box::new(ArrowPageWriter::new(store)))
1453    }
1454
1455    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1456    /// output ColumnDesc to `leaves` and the column writers to `out`
1457    fn get_arrow_column_writer(
1458        &self,
1459        data_type: &ArrowDataType,
1460        props: &WriterPropertiesPtr,
1461        leaves: &mut Iter<'_, ColumnDescPtr>,
1462        out: &mut Vec<ArrowColumnWriter>,
1463    ) -> Result<()> {
1464        // Instantiate writers for normal columns
1465        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1466            let page_writer = self.create_page_writer(desc, out.len())?;
1467            let chunk = page_writer.buffer.clone();
1468            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1469            Ok(ArrowColumnWriter {
1470                chunk,
1471                writer: ArrowColumnWriterImpl::Column(writer),
1472            })
1473        };
1474
1475        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1476        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1477            let page_writer = self.create_page_writer(desc, out.len())?;
1478            let chunk = page_writer.buffer.clone();
1479            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1480            Ok(ArrowColumnWriter {
1481                chunk,
1482                writer: ArrowColumnWriterImpl::ByteArray(writer),
1483            })
1484        };
1485
1486        match data_type {
1487            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1488            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1489                out.push(col(leaves.next().unwrap())?)
1490            }
1491            ArrowDataType::LargeBinary
1492            | ArrowDataType::Binary
1493            | ArrowDataType::Utf8
1494            | ArrowDataType::LargeUtf8
1495            | ArrowDataType::BinaryView
1496            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1497            ArrowDataType::List(f)
1498            | ArrowDataType::LargeList(f)
1499            | ArrowDataType::FixedSizeList(f, _)
1500            | ArrowDataType::ListView(f)
1501            | ArrowDataType::LargeListView(f) => {
1502                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1503            }
1504            ArrowDataType::Struct(fields) => {
1505                for field in fields {
1506                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1507                }
1508            }
1509            ArrowDataType::Map(f, _) => match f.data_type() {
1510                ArrowDataType::Struct(f) => {
1511                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1512                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1513                }
1514                _ => unreachable!("invalid map type"),
1515            },
1516            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1517                ArrowDataType::Utf8
1518                | ArrowDataType::LargeUtf8
1519                | ArrowDataType::Binary
1520                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1521                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1522                    out.push(bytes(leaves.next().unwrap())?)
1523                }
1524                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1525                _ => out.push(col(leaves.next().unwrap())?),
1526            },
1527            ArrowDataType::RunEndEncoded(_, value_field) => {
1528                self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1529            }
1530            _ => {
1531                return Err(ParquetError::NYI(format!(
1532                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1533                )));
1534            }
1535        }
1536        Ok(())
1537    }
1538}
1539
1540fn write_leaf(
1541    writer: &mut ColumnWriter<'_>,
1542    column: &dyn arrow_array::Array,
1543    levels: &ArrayLevels,
1544) -> Result<usize> {
1545    let indices = levels.non_null_indices();
1546
1547    match writer {
1548        // Note: this should match the contents of arrow_to_parquet_type
1549        ColumnWriter::Int32ColumnWriter(typed) => {
1550            match column.data_type() {
1551                ArrowDataType::Null => {
1552                    let array = Int32Array::new_null(column.len());
1553                    write_primitive(typed, array.values(), levels)
1554                }
1555                ArrowDataType::Int8 => {
1556                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1557                    write_primitive(typed, array.values(), levels)
1558                }
1559                ArrowDataType::Int16 => {
1560                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1561                    write_primitive(typed, array.values(), levels)
1562                }
1563                ArrowDataType::Int32 => {
1564                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1565                }
1566                ArrowDataType::UInt8 => {
1567                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1568                    write_primitive(typed, array.values(), levels)
1569                }
1570                ArrowDataType::UInt16 => {
1571                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1572                    write_primitive(typed, array.values(), levels)
1573                }
1574                ArrowDataType::UInt32 => {
1575                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1576                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1577                    let array = column.as_primitive::<UInt32Type>();
1578                    write_primitive(typed, array.values().inner().typed_data(), levels)
1579                }
1580                ArrowDataType::Date32 => {
1581                    let array = column.as_primitive::<Date32Type>();
1582                    write_primitive(typed, array.values(), levels)
1583                }
1584                ArrowDataType::Time32(TimeUnit::Second) => {
1585                    let array = column.as_primitive::<Time32SecondType>();
1586                    write_primitive(typed, array.values(), levels)
1587                }
1588                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1589                    let array = column.as_primitive::<Time32MillisecondType>();
1590                    write_primitive(typed, array.values(), levels)
1591                }
1592                ArrowDataType::Date64 => {
1593                    // If the column is a Date64, we truncate it
1594                    let array: Int32Array = column
1595                        .as_primitive::<Date64Type>()
1596                        .unary(|x| (x / 86_400_000) as _);
1597
1598                    write_primitive(typed, array.values(), levels)
1599                }
1600                ArrowDataType::Decimal32(_, _) => {
1601                    let array = column
1602                        .as_primitive::<Decimal32Type>()
1603                        .unary::<_, Int32Type>(|v| v);
1604                    write_primitive(typed, array.values(), levels)
1605                }
1606                ArrowDataType::Decimal64(_, _) => {
1607                    // use the int32 to represent the decimal with low precision
1608                    let array = column
1609                        .as_primitive::<Decimal64Type>()
1610                        .unary::<_, Int32Type>(|v| v as i32);
1611                    write_primitive(typed, array.values(), levels)
1612                }
1613                ArrowDataType::Decimal128(_, _) => {
1614                    // use the int32 to represent the decimal with low precision
1615                    let array = column
1616                        .as_primitive::<Decimal128Type>()
1617                        .unary::<_, Int32Type>(|v| v as i32);
1618                    write_primitive(typed, array.values(), levels)
1619                }
1620                ArrowDataType::Decimal256(_, _) => {
1621                    // use the int32 to represent the decimal with low precision
1622                    let array = column
1623                        .as_primitive::<Decimal256Type>()
1624                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1625                    write_primitive(typed, array.values(), levels)
1626                }
1627                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1628            }
1629        }
1630        ColumnWriter::BoolColumnWriter(typed) => {
1631            let array = column.as_boolean();
1632            let values = get_bool_array_slice(array, indices.iter().copied());
1633            typed.write_batch_internal(
1634                values.as_slice(),
1635                None,
1636                levels.def_level_data().as_ref(),
1637                levels.rep_level_data().as_ref(),
1638                None,
1639                None,
1640                None,
1641            )
1642        }
1643        ColumnWriter::Int64ColumnWriter(typed) => {
1644            match column.data_type() {
1645                ArrowDataType::Date64 => {
1646                    let array = column
1647                        .as_primitive::<Date64Type>()
1648                        .reinterpret_cast::<Int64Type>();
1649
1650                    write_primitive(typed, array.values(), levels)
1651                }
1652                ArrowDataType::Int64 => {
1653                    let array = column.as_primitive::<Int64Type>();
1654                    write_primitive(typed, array.values(), levels)
1655                }
1656                ArrowDataType::UInt64 => {
1657                    let values = column.as_primitive::<UInt64Type>().values();
1658                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1659                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1660                    let array = values.inner().typed_data::<i64>();
1661                    write_primitive(typed, array, levels)
1662                }
1663                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1664                    let array = column.as_primitive::<Time64MicrosecondType>();
1665                    write_primitive(typed, array.values(), levels)
1666                }
1667                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1668                    let array = column.as_primitive::<Time64NanosecondType>();
1669                    write_primitive(typed, array.values(), levels)
1670                }
1671                ArrowDataType::Timestamp(unit, _) => match unit {
1672                    TimeUnit::Second => {
1673                        let array = column.as_primitive::<TimestampSecondType>();
1674                        write_primitive(typed, array.values(), levels)
1675                    }
1676                    TimeUnit::Millisecond => {
1677                        let array = column.as_primitive::<TimestampMillisecondType>();
1678                        write_primitive(typed, array.values(), levels)
1679                    }
1680                    TimeUnit::Microsecond => {
1681                        let array = column.as_primitive::<TimestampMicrosecondType>();
1682                        write_primitive(typed, array.values(), levels)
1683                    }
1684                    TimeUnit::Nanosecond => {
1685                        let array = column.as_primitive::<TimestampNanosecondType>();
1686                        write_primitive(typed, array.values(), levels)
1687                    }
1688                },
1689                ArrowDataType::Duration(unit) => match unit {
1690                    TimeUnit::Second => {
1691                        let array = column.as_primitive::<DurationSecondType>();
1692                        write_primitive(typed, array.values(), levels)
1693                    }
1694                    TimeUnit::Millisecond => {
1695                        let array = column.as_primitive::<DurationMillisecondType>();
1696                        write_primitive(typed, array.values(), levels)
1697                    }
1698                    TimeUnit::Microsecond => {
1699                        let array = column.as_primitive::<DurationMicrosecondType>();
1700                        write_primitive(typed, array.values(), levels)
1701                    }
1702                    TimeUnit::Nanosecond => {
1703                        let array = column.as_primitive::<DurationNanosecondType>();
1704                        write_primitive(typed, array.values(), levels)
1705                    }
1706                },
1707                ArrowDataType::Decimal64(_, _) => {
1708                    let array = column
1709                        .as_primitive::<Decimal64Type>()
1710                        .reinterpret_cast::<Int64Type>();
1711                    write_primitive(typed, array.values(), levels)
1712                }
1713                ArrowDataType::Decimal128(_, _) => {
1714                    // use the int64 to represent the decimal with low precision
1715                    let array = column
1716                        .as_primitive::<Decimal128Type>()
1717                        .unary::<_, Int64Type>(|v| v as i64);
1718                    write_primitive(typed, array.values(), levels)
1719                }
1720                ArrowDataType::Decimal256(_, _) => {
1721                    // use the int64 to represent the decimal with low precision
1722                    let array = column
1723                        .as_primitive::<Decimal256Type>()
1724                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1725                    write_primitive(typed, array.values(), levels)
1726                }
1727                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1728            }
1729        }
1730        ColumnWriter::Int96ColumnWriter(_typed) => {
1731            unreachable!("Currently unreachable because data type not supported")
1732        }
1733        ColumnWriter::FloatColumnWriter(typed) => {
1734            let array = column.as_primitive::<Float32Type>();
1735            write_primitive(typed, array.values(), levels)
1736        }
1737        ColumnWriter::DoubleColumnWriter(typed) => {
1738            let array = column.as_primitive::<Float64Type>();
1739            write_primitive(typed, array.values(), levels)
1740        }
1741        ColumnWriter::ByteArrayColumnWriter(_) => {
1742            unreachable!("should use ByteArrayWriter")
1743        }
1744        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1745            let bytes = match column.data_type() {
1746                ArrowDataType::Interval(interval_unit) => match interval_unit {
1747                    IntervalUnit::YearMonth => {
1748                        let array = column.as_primitive::<IntervalYearMonthType>();
1749                        get_interval_ym_array_slice(array, indices.iter().copied())
1750                    }
1751                    IntervalUnit::DayTime => {
1752                        let array = column.as_primitive::<IntervalDayTimeType>();
1753                        get_interval_dt_array_slice(array, indices.iter().copied())
1754                    }
1755                    _ => {
1756                        return Err(ParquetError::NYI(format!(
1757                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1758                        )));
1759                    }
1760                },
1761                ArrowDataType::FixedSizeBinary(_) => {
1762                    let array = column.as_fixed_size_binary();
1763                    get_fsb_array_slice(array, indices.iter().copied())
1764                }
1765                ArrowDataType::Decimal32(_, _) => {
1766                    let array = column.as_primitive::<Decimal32Type>();
1767                    get_decimal_32_array_slice(array, indices.iter().copied())
1768                }
1769                ArrowDataType::Decimal64(_, _) => {
1770                    let array = column.as_primitive::<Decimal64Type>();
1771                    get_decimal_64_array_slice(array, indices.iter().copied())
1772                }
1773                ArrowDataType::Decimal128(_, _) => {
1774                    let array = column.as_primitive::<Decimal128Type>();
1775                    get_decimal_128_array_slice(array, indices.iter().copied())
1776                }
1777                ArrowDataType::Decimal256(_, _) => {
1778                    let array = column.as_primitive::<Decimal256Type>();
1779                    get_decimal_256_array_slice(array, indices.iter().copied())
1780                }
1781                ArrowDataType::Float16 => {
1782                    let array = column.as_primitive::<Float16Type>();
1783                    get_float_16_array_slice(array, indices.iter().copied())
1784                }
1785                _ => {
1786                    return Err(ParquetError::NYI(
1787                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1788                    ));
1789                }
1790            };
1791            typed.write_batch_internal(
1792                bytes.as_slice(),
1793                None,
1794                levels.def_level_data().as_ref(),
1795                levels.rep_level_data().as_ref(),
1796                None,
1797                None,
1798                None,
1799            )
1800        }
1801    }
1802}
1803
1804fn write_primitive<E: ColumnValueEncoder>(
1805    writer: &mut GenericColumnWriter<E>,
1806    values: &E::Values,
1807    levels: &ArrayLevels,
1808) -> Result<usize> {
1809    writer.write_batch_internal(
1810        values,
1811        Some(levels.non_null_indices()),
1812        levels.def_level_data().as_ref(),
1813        levels.rep_level_data().as_ref(),
1814        None,
1815        None,
1816        None,
1817    )
1818}
1819
1820fn get_bool_array_slice(
1821    array: &arrow_array::BooleanArray,
1822    indices: impl ExactSizeIterator<Item = usize>,
1823) -> Vec<bool> {
1824    let mut values = Vec::with_capacity(indices.len());
1825    for i in indices {
1826        values.push(array.value(i))
1827    }
1828    values
1829}
1830
1831/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1832/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1833fn get_interval_ym_array_slice(
1834    array: &arrow_array::IntervalYearMonthArray,
1835    indices: impl ExactSizeIterator<Item = usize>,
1836) -> Vec<FixedLenByteArray> {
1837    let mut values = Vec::with_capacity(indices.len());
1838    for i in indices {
1839        let mut value = array.value(i).to_le_bytes().to_vec();
1840        let mut suffix = vec![0; 8];
1841        value.append(&mut suffix);
1842        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1843    }
1844    values
1845}
1846
1847/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1848/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1849fn get_interval_dt_array_slice(
1850    array: &arrow_array::IntervalDayTimeArray,
1851    indices: impl ExactSizeIterator<Item = usize>,
1852) -> Vec<FixedLenByteArray> {
1853    let mut values = Vec::with_capacity(indices.len());
1854    for i in indices {
1855        let mut out = [0; 12];
1856        let value = array.value(i);
1857        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1858        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1859        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1860    }
1861    values
1862}
1863
1864fn get_decimal_32_array_slice(
1865    array: &arrow_array::Decimal32Array,
1866    indices: impl ExactSizeIterator<Item = usize>,
1867) -> Vec<FixedLenByteArray> {
1868    let mut values = Vec::with_capacity(indices.len());
1869    let size = decimal_length_from_precision(array.precision());
1870    for i in indices {
1871        let as_be_bytes = array.value(i).to_be_bytes();
1872        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1873        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1874    }
1875    values
1876}
1877
1878fn get_decimal_64_array_slice(
1879    array: &arrow_array::Decimal64Array,
1880    indices: impl ExactSizeIterator<Item = usize>,
1881) -> Vec<FixedLenByteArray> {
1882    let mut values = Vec::with_capacity(indices.len());
1883    let size = decimal_length_from_precision(array.precision());
1884    for i in indices {
1885        let as_be_bytes = array.value(i).to_be_bytes();
1886        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1887        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1888    }
1889    values
1890}
1891
1892fn get_decimal_128_array_slice(
1893    array: &arrow_array::Decimal128Array,
1894    indices: impl ExactSizeIterator<Item = usize>,
1895) -> Vec<FixedLenByteArray> {
1896    let mut values = Vec::with_capacity(indices.len());
1897    let size = decimal_length_from_precision(array.precision());
1898    for i in indices {
1899        let as_be_bytes = array.value(i).to_be_bytes();
1900        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1901        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1902    }
1903    values
1904}
1905
1906fn get_decimal_256_array_slice(
1907    array: &arrow_array::Decimal256Array,
1908    indices: impl ExactSizeIterator<Item = usize>,
1909) -> Vec<FixedLenByteArray> {
1910    let mut values = Vec::with_capacity(indices.len());
1911    let size = decimal_length_from_precision(array.precision());
1912    for i in indices {
1913        let as_be_bytes = array.value(i).to_be_bytes();
1914        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1915        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1916    }
1917    values
1918}
1919
1920fn get_float_16_array_slice(
1921    array: &arrow_array::Float16Array,
1922    indices: impl ExactSizeIterator<Item = usize>,
1923) -> Vec<FixedLenByteArray> {
1924    let mut values = Vec::with_capacity(indices.len());
1925    for i in indices {
1926        let value = array.value(i).to_le_bytes().to_vec();
1927        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1928    }
1929    values
1930}
1931
1932fn get_fsb_array_slice(
1933    array: &arrow_array::FixedSizeBinaryArray,
1934    indices: impl ExactSizeIterator<Item = usize>,
1935) -> Vec<FixedLenByteArray> {
1936    let mut values = Vec::with_capacity(indices.len());
1937    for i in indices {
1938        let value = array.value(i).to_vec();
1939        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1940    }
1941    values
1942}
1943
1944#[cfg(test)]
1945mod tests {
1946    use super::*;
1947    use std::collections::HashMap;
1948
1949    use std::fs::File;
1950
1951    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1952    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1953    use crate::column::page::{Page, PageReader};
1954    use crate::file::metadata::thrift::PageHeader;
1955    use crate::file::page_index::column_index::ColumnIndexMetaData;
1956    use crate::file::reader::SerializedPageReader;
1957    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1958    use crate::schema::types::ColumnPath;
1959    use arrow::datatypes::ToByteSlice;
1960    use arrow::datatypes::{DataType, Schema};
1961    use arrow::error::Result as ArrowResult;
1962    use arrow::util::data_gen::create_random_array;
1963    use arrow::util::pretty::pretty_format_batches;
1964    use arrow::{array::*, buffer::Buffer};
1965    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1966    use arrow_schema::Fields;
1967    use half::f16;
1968    use num_traits::{FromPrimitive, ToPrimitive};
1969    use tempfile::tempfile;
1970
1971    use crate::basic::Encoding;
1972    use crate::data_type::AsBytes;
1973    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1974    use crate::file::properties::{
1975        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1976    };
1977    use crate::file::serialized_reader::ReadOptionsBuilder;
1978    use crate::file::{
1979        reader::{FileReader, SerializedFileReader},
1980        statistics::Statistics,
1981    };
1982
1983    /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and keeps
1984    /// blobs in a `HashMap` — nothing like the default `Vec<Bytes>`. Used to
1985    /// prove the writer relies only on the opaque-handle contract and never on
1986    /// handles being dense `Vec` indices. Records how many blobs were stored.
1987    #[derive(Debug, Default)]
1988    struct RecordingPageStore {
1989        next: u64,
1990        blobs: HashMap<u64, Bytes>,
1991        puts: Arc<std::sync::atomic::AtomicUsize>,
1992    }
1993
1994    impl PageStore for RecordingPageStore {
1995        fn put(&mut self, value: Bytes) -> Result<PageKey> {
1996            // Deliberately non-sequential, never-zero handles.
1997            let id = 100 + self.next * 7;
1998            self.next += 1;
1999            self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2000            self.blobs.insert(id, value);
2001            Ok(PageKey::new(id))
2002        }
2003
2004        fn take(&mut self, key: PageKey) -> Result<Bytes> {
2005            self.blobs
2006                .remove(&key.get())
2007                .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
2008        }
2009    }
2010
2011    #[derive(Debug)]
2012    struct RecordingPageStoreFactory {
2013        puts: Arc<std::sync::atomic::AtomicUsize>,
2014    }
2015
2016    impl PageStoreFactory for RecordingPageStoreFactory {
2017        fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2018            Ok(Box::new(RecordingPageStore {
2019                puts: self.puts.clone(),
2020                ..Default::default()
2021            }))
2022        }
2023    }
2024
2025    /// A custom [`PageStore`] must produce byte-identical files to the in-memory
2026    /// default, across dictionary and non-dictionary columns and multiple row
2027    /// groups (so multiple store instances are exercised).
2028    #[test]
2029    fn custom_page_store_is_byte_identical_to_default() {
2030        let schema = Arc::new(Schema::new(vec![
2031            Field::new("i", DataType::Int32, true),
2032            // A low-cardinality string column to exercise the dictionary path.
2033            Field::new("s", DataType::Utf8, true),
2034        ]));
2035        let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2036        let s = StringArray::from(vec![
2037            Some("a"),
2038            Some("bb"),
2039            Some("a"),
2040            None,
2041            Some("bb"),
2042            Some("ccc"),
2043        ]);
2044        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2045
2046        // Small row groups so multiple column chunks (hence multiple store
2047        // instances) are produced.
2048        let props = WriterProperties::builder()
2049            .set_max_row_group_row_count(Some(3))
2050            .build();
2051
2052        let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2053            let mut buffer = Vec::new();
2054            let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2055            if let Some(factory) = factory {
2056                opts = opts.with_page_store_factory(factory);
2057            }
2058            let mut writer =
2059                ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2060            writer.write(&batch).unwrap();
2061            writer.close().unwrap();
2062            buffer
2063        };
2064
2065        let default_bytes = write(None);
2066
2067        let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2068        let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2069            puts: puts.clone(),
2070        })));
2071
2072        assert!(
2073            puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2074            "custom PageStore was never written to"
2075        );
2076        assert_eq!(
2077            default_bytes, custom_bytes,
2078            "a custom PageStore must produce byte-identical output to the default"
2079        );
2080    }
2081
2082    /// A dictionary-encoded column written through the deferred-ordering Arrow
2083    /// path must round-trip correctly even with the offset index disabled, when
2084    /// only the chunk-level dictionary/data page offsets are rewritten (there is
2085    /// no offset index to rebuild). Spans multiple data pages so the
2086    /// dictionary-first reordering is exercised.
2087    #[test]
2088    fn dictionary_column_round_trips_with_offset_index_disabled() {
2089        let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2090
2091        // Low cardinality so the column stays dictionary-encoded; enough rows to
2092        // span several data pages within a single row group.
2093        let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2094        let array = Int32Array::from(values.clone());
2095        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2096
2097        let props = WriterProperties::builder()
2098            .set_offset_index_disabled(true)
2099            .set_data_page_row_count_limit(4096)
2100            .build();
2101        let opts = ArrowWriterOptions::new().with_properties(props);
2102
2103        let mut buffer = Vec::new();
2104        let mut writer =
2105            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2106        writer.write(&batch).unwrap();
2107        writer.close().unwrap();
2108
2109        let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2110        let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2111        let read_values: Vec<Option<i32>> = read
2112            .iter()
2113            .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2114            .collect();
2115        assert_eq!(read_values, values);
2116    }
2117
2118    /// The dictionary page is routed through the [`PageStore`] like any other
2119    /// page rather than held resident in memory, so a dictionary column chunk's
2120    /// *entire* serialized size — dictionary page included — passes through the
2121    /// store.
2122    #[test]
2123    fn dictionary_page_is_routed_through_the_store() {
2124        /// A store that sums the bytes handed to `put`.
2125        #[derive(Debug, Default)]
2126        struct SizeRecordingPageStore {
2127            blobs: Vec<Bytes>,
2128            bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2129        }
2130        impl PageStore for SizeRecordingPageStore {
2131            fn put(&mut self, value: Bytes) -> Result<PageKey> {
2132                self.bytes_put
2133                    .fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed);
2134                let key = PageKey::new(self.blobs.len() as u64);
2135                self.blobs.push(value);
2136                Ok(key)
2137            }
2138            fn take(&mut self, key: PageKey) -> Result<Bytes> {
2139                Ok(std::mem::take(&mut self.blobs[key.get() as usize]))
2140            }
2141        }
2142        #[derive(Debug)]
2143        struct Factory {
2144            bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2145        }
2146        impl PageStoreFactory for Factory {
2147            fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2148                Ok(Box::new(SizeRecordingPageStore {
2149                    bytes_put: self.bytes_put.clone(),
2150                    ..Default::default()
2151                }))
2152            }
2153        }
2154
2155        let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
2156        // Low cardinality keeps the column dictionary-encoded with a real,
2157        // non-empty dictionary page.
2158        let values: Vec<&str> = (0..2048)
2159            .map(|i| ["alpha", "beta", "gamma", "delta"][i % 4])
2160            .collect();
2161        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])
2162            .unwrap();
2163
2164        let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2165        let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory {
2166            bytes_put: bytes_put.clone(),
2167        }));
2168
2169        // A single batch / single column means exactly one row group and one
2170        // store instance, so the bytes it saw map to one column chunk.
2171        let mut buffer = Vec::new();
2172        let mut writer =
2173            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2174        writer.write(&batch).unwrap();
2175        writer.close().unwrap();
2176
2177        let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
2178        let column = reader.metadata().row_group(0).column(0);
2179        assert!(
2180            column.dictionary_page_offset().is_some(),
2181            "expected the column to be dictionary-encoded"
2182        );
2183
2184        // The bytes the store was handed must account for the whole chunk,
2185        // dictionary page included. Holding the dictionary page apart from the
2186        // store would make this fall short by the dictionary page's size.
2187        assert_eq!(
2188            bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64,
2189            column.compressed_size(),
2190            "the dictionary page must pass through the store like any other page"
2191        );
2192    }
2193
2194    #[test]
2195    fn arrow_writer() {
2196        // define schema
2197        let schema = Schema::new(vec![
2198            Field::new("a", DataType::Int32, false),
2199            Field::new("b", DataType::Int32, true),
2200        ]);
2201
2202        // create some data
2203        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2204        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2205
2206        // build a record batch
2207        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2208
2209        roundtrip(batch, Some(SMALL_SIZE / 2));
2210    }
2211
2212    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2213        let mut buffer = vec![];
2214
2215        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2216        writer.write(expected_batch).unwrap();
2217        writer.close().unwrap();
2218
2219        buffer
2220    }
2221
2222    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2223        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2224        writer.write(expected_batch).unwrap();
2225        writer.into_inner().unwrap()
2226    }
2227
2228    #[test]
2229    fn roundtrip_bytes() {
2230        // define schema
2231        let schema = Arc::new(Schema::new(vec![
2232            Field::new("a", DataType::Int32, false),
2233            Field::new("b", DataType::Int32, true),
2234        ]));
2235
2236        // create some data
2237        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2238        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2239
2240        // build a record batch
2241        let expected_batch =
2242            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2243
2244        for buffer in [
2245            get_bytes_after_close(schema.clone(), &expected_batch),
2246            get_bytes_by_into_inner(schema, &expected_batch),
2247        ] {
2248            let cursor = Bytes::from(buffer);
2249            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2250
2251            let actual_batch = record_batch_reader
2252                .next()
2253                .expect("No batch found")
2254                .expect("Unable to get batch");
2255
2256            assert_eq!(expected_batch.schema(), actual_batch.schema());
2257            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2258            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2259            for i in 0..expected_batch.num_columns() {
2260                let expected_data = expected_batch.column(i).to_data();
2261                let actual_data = actual_batch.column(i).to_data();
2262
2263                assert_eq!(expected_data, actual_data);
2264            }
2265        }
2266    }
2267
2268    #[test]
2269    fn arrow_writer_non_null() {
2270        // define schema
2271        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2272
2273        // create some data
2274        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2275
2276        // build a record batch
2277        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2278
2279        roundtrip(batch, Some(SMALL_SIZE / 2));
2280    }
2281
2282    #[test]
2283    fn arrow_writer_list() {
2284        // define schema
2285        let schema = Schema::new(vec![Field::new(
2286            "a",
2287            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2288            true,
2289        )]);
2290
2291        // create some data
2292        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2293
2294        // Construct a buffer for value offsets, for the nested array:
2295        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2296        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2297
2298        // Construct a list array from the above two
2299        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2300            DataType::Int32,
2301            false,
2302        ))))
2303        .len(5)
2304        .add_buffer(a_value_offsets)
2305        .add_child_data(a_values.into_data())
2306        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2307        .build()
2308        .unwrap();
2309        let a = ListArray::from(a_list_data);
2310
2311        // build a record batch
2312        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2313
2314        assert_eq!(batch.column(0).null_count(), 1);
2315
2316        // This test fails if the max row group size is less than the batch's length
2317        // see https://github.com/apache/arrow-rs/issues/518
2318        roundtrip(batch, None);
2319    }
2320
2321    #[test]
2322    fn arrow_writer_list_non_null() {
2323        // define schema
2324        let schema = Schema::new(vec![Field::new(
2325            "a",
2326            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2327            false,
2328        )]);
2329
2330        // create some data
2331        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2332
2333        // Construct a buffer for value offsets, for the nested array:
2334        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2335        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2336
2337        // Construct a list array from the above two
2338        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2339            DataType::Int32,
2340            false,
2341        ))))
2342        .len(5)
2343        .add_buffer(a_value_offsets)
2344        .add_child_data(a_values.into_data())
2345        .build()
2346        .unwrap();
2347        let a = ListArray::from(a_list_data);
2348
2349        // build a record batch
2350        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2351
2352        // This test fails if the max row group size is less than the batch's length
2353        // see https://github.com/apache/arrow-rs/issues/518
2354        assert_eq!(batch.column(0).null_count(), 0);
2355
2356        roundtrip(batch, None);
2357    }
2358
2359    #[test]
2360    fn arrow_writer_list_view() {
2361        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2362        let schema = Schema::new(vec![Field::new(
2363            "a",
2364            DataType::ListView(list_field.clone()),
2365            true,
2366        )]);
2367
2368        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2369        let a = ListViewArray::new(
2370            list_field,
2371            vec![0, 1, 0, 3, 6].into(),
2372            vec![1, 2, 0, 3, 4].into(),
2373            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2374            Some(vec![true, true, false, true, true].into()),
2375        );
2376
2377        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2378
2379        assert_eq!(batch.column(0).null_count(), 1);
2380
2381        roundtrip(batch, None);
2382    }
2383
2384    #[test]
2385    fn arrow_writer_list_view_non_null() {
2386        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2387        let schema = Schema::new(vec![Field::new(
2388            "a",
2389            DataType::ListView(list_field.clone()),
2390            false,
2391        )]);
2392
2393        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2394        let a = ListViewArray::new(
2395            list_field,
2396            vec![0, 1, 0, 3, 6].into(),
2397            vec![1, 2, 0, 3, 4].into(),
2398            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2399            None,
2400        );
2401
2402        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2403
2404        assert_eq!(batch.column(0).null_count(), 0);
2405
2406        roundtrip(batch, None);
2407    }
2408
2409    #[test]
2410    fn arrow_writer_list_view_out_of_order() {
2411        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2412        let schema = Schema::new(vec![Field::new(
2413            "a",
2414            DataType::ListView(list_field.clone()),
2415            false,
2416        )]);
2417
2418        // [[1], [2, 3], [], [7, 8, 9, 10], [4, 5, 6]] - out of order offsets
2419        let a = ListViewArray::new(
2420            list_field,
2421            vec![0, 1, 0, 6, 3].into(),
2422            vec![1, 2, 0, 4, 3].into(),
2423            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2424            None,
2425        );
2426
2427        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2428
2429        roundtrip(batch, None);
2430    }
2431
2432    #[test]
2433    fn arrow_writer_large_list_view() {
2434        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2435        let schema = Schema::new(vec![Field::new(
2436            "a",
2437            DataType::LargeListView(list_field.clone()),
2438            true,
2439        )]);
2440
2441        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2442        let a = LargeListViewArray::new(
2443            list_field,
2444            vec![0i64, 1, 0, 3, 6].into(),
2445            vec![1i64, 2, 0, 3, 4].into(),
2446            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2447            Some(vec![true, true, false, true, true].into()),
2448        );
2449
2450        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2451
2452        assert_eq!(batch.column(0).null_count(), 1);
2453
2454        roundtrip(batch, None);
2455    }
2456
2457    #[test]
2458    fn arrow_writer_list_view_with_struct() {
2459        // Test ListView containing Struct: ListView<Struct<Int32, Utf8>>
2460        let struct_fields = Fields::from(vec![
2461            Field::new("id", DataType::Int32, false),
2462            Field::new("name", DataType::Utf8, false),
2463        ]);
2464        let struct_type = DataType::Struct(struct_fields.clone());
2465        let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2466
2467        let schema = Schema::new(vec![Field::new(
2468            "a",
2469            DataType::ListView(list_field.clone()),
2470            true,
2471        )]);
2472
2473        // Create struct values
2474        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2475        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2476        let struct_array = StructArray::new(
2477            struct_fields,
2478            vec![Arc::new(id_array), Arc::new(name_array)],
2479            None,
2480        );
2481
2482        // Create ListView: [{1, "a"}, {2, "b"}], null, [{3, "c"}, {4, "d"}, {5, "e"}]
2483        let list_view = ListViewArray::new(
2484            list_field,
2485            vec![0, 2, 2].into(), // offsets
2486            vec![2, 0, 3].into(), // sizes
2487            Arc::new(struct_array),
2488            Some(vec![true, false, true].into()),
2489        );
2490
2491        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2492
2493        roundtrip(batch, None);
2494    }
2495
2496    #[test]
2497    fn arrow_writer_binary() {
2498        let string_field = Field::new("a", DataType::Utf8, false);
2499        let binary_field = Field::new("b", DataType::Binary, false);
2500        let schema = Schema::new(vec![string_field, binary_field]);
2501
2502        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2503        let raw_binary_values = [
2504            b"foo".to_vec(),
2505            b"bar".to_vec(),
2506            b"baz".to_vec(),
2507            b"quux".to_vec(),
2508        ];
2509        let raw_binary_value_refs = raw_binary_values
2510            .iter()
2511            .map(|x| x.as_slice())
2512            .collect::<Vec<_>>();
2513
2514        let string_values = StringArray::from(raw_string_values.clone());
2515        let binary_values = BinaryArray::from(raw_binary_value_refs);
2516        let batch = RecordBatch::try_new(
2517            Arc::new(schema),
2518            vec![Arc::new(string_values), Arc::new(binary_values)],
2519        )
2520        .unwrap();
2521
2522        roundtrip(batch, Some(SMALL_SIZE / 2));
2523    }
2524
2525    #[test]
2526    fn arrow_writer_binary_view() {
2527        let string_field = Field::new("a", DataType::Utf8View, false);
2528        let binary_field = Field::new("b", DataType::BinaryView, false);
2529        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2530        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2531
2532        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2533        let raw_binary_values = vec![
2534            b"foo".to_vec(),
2535            b"bar".to_vec(),
2536            b"large payload over 12 bytes".to_vec(),
2537            b"lulu".to_vec(),
2538        ];
2539        let nullable_string_values =
2540            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2541
2542        let string_view_values = StringViewArray::from(raw_string_values);
2543        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2544        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2545        let batch = RecordBatch::try_new(
2546            Arc::new(schema),
2547            vec![
2548                Arc::new(string_view_values),
2549                Arc::new(binary_view_values),
2550                Arc::new(nullable_string_view_values),
2551            ],
2552        )
2553        .unwrap();
2554
2555        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2556        roundtrip(batch, None);
2557    }
2558
2559    #[test]
2560    fn arrow_writer_binary_view_long_value() {
2561        let string_field = Field::new("a", DataType::Utf8View, false);
2562        let binary_field = Field::new("b", DataType::BinaryView, false);
2563        let schema = Schema::new(vec![string_field, binary_field]);
2564
2565        // There is special case validation for long values (greater than 128)
2566        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
2567        // trigger the long-string UTF-8 validation branch in the plain decoder.
2568        let long = "a".repeat(128);
2569        let raw_string_values = vec!["foo", long.as_str(), "bar"];
2570        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2571
2572        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2573        let binary_view_values: ArrayRef =
2574            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2575
2576        one_column_roundtrip(Arc::clone(&string_view_values), false);
2577        one_column_roundtrip(Arc::clone(&binary_view_values), false);
2578
2579        let batch = RecordBatch::try_new(
2580            Arc::new(schema),
2581            vec![string_view_values, binary_view_values],
2582        )
2583        .unwrap();
2584
2585        // Disable dictionary to exercise plain encoding paths in the reader.
2586        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2587            let props = WriterProperties::builder()
2588                .set_writer_version(version)
2589                .set_dictionary_enabled(false)
2590                .build();
2591            roundtrip_opts(&batch, props);
2592        }
2593    }
2594
2595    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2596        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2597        let schema = Schema::new(vec![decimal_field]);
2598
2599        let decimal_values = vec![10_000, 50_000, 0, -100]
2600            .into_iter()
2601            .map(Some)
2602            .collect::<Decimal128Array>()
2603            .with_precision_and_scale(precision, scale)
2604            .unwrap();
2605
2606        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2607    }
2608
2609    #[test]
2610    fn arrow_writer_decimal() {
2611        // int32 to store the decimal value
2612        let batch_int32_decimal = get_decimal_batch(5, 2);
2613        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2614        // int64 to store the decimal value
2615        let batch_int64_decimal = get_decimal_batch(12, 2);
2616        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2617        // fixed_length_byte_array to store the decimal value
2618        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2619        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2620    }
2621
2622    #[test]
2623    fn arrow_writer_complex() {
2624        // define schema
2625        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2626        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2627        let struct_field_g = Arc::new(Field::new_list(
2628            "g",
2629            Field::new_list_field(DataType::Int16, true),
2630            false,
2631        ));
2632        let struct_field_h = Arc::new(Field::new_list(
2633            "h",
2634            Field::new_list_field(DataType::Int16, false),
2635            true,
2636        ));
2637        let struct_field_e = Arc::new(Field::new_struct(
2638            "e",
2639            vec![
2640                struct_field_f.clone(),
2641                struct_field_g.clone(),
2642                struct_field_h.clone(),
2643            ],
2644            false,
2645        ));
2646        let schema = Schema::new(vec![
2647            Field::new("a", DataType::Int32, false),
2648            Field::new("b", DataType::Int32, true),
2649            Field::new_struct(
2650                "c",
2651                vec![struct_field_d.clone(), struct_field_e.clone()],
2652                false,
2653            ),
2654        ]);
2655
2656        // create some data
2657        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2658        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2659        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2660        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2661
2662        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2663
2664        // Construct a buffer for value offsets, for the nested array:
2665        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2666        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2667
2668        // Construct a list array from the above two
2669        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2670            .len(5)
2671            .add_buffer(g_value_offsets.clone())
2672            .add_child_data(g_value.to_data())
2673            .build()
2674            .unwrap();
2675        let g = ListArray::from(g_list_data);
2676        // The difference between g and h is that h has a null bitmap
2677        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2678            .len(5)
2679            .add_buffer(g_value_offsets)
2680            .add_child_data(g_value.to_data())
2681            .null_bit_buffer(Some(Buffer::from([0b00011011])))
2682            .build()
2683            .unwrap();
2684        let h = ListArray::from(h_list_data);
2685
2686        let e = StructArray::from(vec![
2687            (struct_field_f, Arc::new(f) as ArrayRef),
2688            (struct_field_g, Arc::new(g) as ArrayRef),
2689            (struct_field_h, Arc::new(h) as ArrayRef),
2690        ]);
2691
2692        let c = StructArray::from(vec![
2693            (struct_field_d, Arc::new(d) as ArrayRef),
2694            (struct_field_e, Arc::new(e) as ArrayRef),
2695        ]);
2696
2697        // build a record batch
2698        let batch = RecordBatch::try_new(
2699            Arc::new(schema),
2700            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2701        )
2702        .unwrap();
2703
2704        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2705        roundtrip(batch, Some(SMALL_SIZE / 3));
2706    }
2707
2708    #[test]
2709    fn arrow_writer_complex_mixed() {
2710        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
2711        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
2712
2713        // define schema
2714        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2715        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2716        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2717        let schema = Schema::new(vec![Field::new(
2718            "some_nested_object",
2719            DataType::Struct(Fields::from(vec![
2720                offset_field.clone(),
2721                partition_field.clone(),
2722                topic_field.clone(),
2723            ])),
2724            false,
2725        )]);
2726
2727        // create some data
2728        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2729        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2730        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2731
2732        let some_nested_object = StructArray::from(vec![
2733            (offset_field, Arc::new(offset) as ArrayRef),
2734            (partition_field, Arc::new(partition) as ArrayRef),
2735            (topic_field, Arc::new(topic) as ArrayRef),
2736        ]);
2737
2738        // build a record batch
2739        let batch =
2740            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2741
2742        roundtrip(batch, Some(SMALL_SIZE / 2));
2743    }
2744
2745    #[test]
2746    fn arrow_writer_map() {
2747        // Note: we are using the JSON Arrow reader for brevity
2748        let json_content = r#"
2749        {"stocks":{"long": "$AAA", "short": "$BBB"}}
2750        {"stocks":{"long": null, "long": "$CCC", "short": null}}
2751        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2752        "#;
2753        let entries_struct_type = DataType::Struct(Fields::from(vec![
2754            Field::new("key", DataType::Utf8, false),
2755            Field::new("value", DataType::Utf8, true),
2756        ]));
2757        let stocks_field = Field::new(
2758            "stocks",
2759            DataType::Map(
2760                Arc::new(Field::new("entries", entries_struct_type, false)),
2761                false,
2762            ),
2763            true,
2764        );
2765        let schema = Arc::new(Schema::new(vec![stocks_field]));
2766        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2767        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2768
2769        let batch = reader.next().unwrap().unwrap();
2770        roundtrip(batch, None);
2771    }
2772
2773    #[test]
2774    fn arrow_writer_2_level_struct() {
2775        // tests writing <struct<struct<primitive>>
2776        let field_c = Field::new("c", DataType::Int32, true);
2777        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2778        let type_a = DataType::Struct(vec![field_b.clone()].into());
2779        let field_a = Field::new("a", type_a, true);
2780        let schema = Schema::new(vec![field_a.clone()]);
2781
2782        // create data
2783        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2784        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2785            .len(6)
2786            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2787            .add_child_data(c.into_data())
2788            .build()
2789            .unwrap();
2790        let b = StructArray::from(b_data);
2791        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2792            .len(6)
2793            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2794            .add_child_data(b.into_data())
2795            .build()
2796            .unwrap();
2797        let a = StructArray::from(a_data);
2798
2799        assert_eq!(a.null_count(), 1);
2800        assert_eq!(a.column(0).null_count(), 2);
2801
2802        // build a racord batch
2803        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2804
2805        roundtrip(batch, Some(SMALL_SIZE / 2));
2806    }
2807
2808    #[test]
2809    fn arrow_writer_2_level_struct_non_null() {
2810        // tests writing <struct<struct<primitive>>
2811        let field_c = Field::new("c", DataType::Int32, false);
2812        let type_b = DataType::Struct(vec![field_c].into());
2813        let field_b = Field::new("b", type_b.clone(), false);
2814        let type_a = DataType::Struct(vec![field_b].into());
2815        let field_a = Field::new("a", type_a.clone(), false);
2816        let schema = Schema::new(vec![field_a]);
2817
2818        // create data
2819        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2820        let b_data = ArrayDataBuilder::new(type_b)
2821            .len(6)
2822            .add_child_data(c.into_data())
2823            .build()
2824            .unwrap();
2825        let b = StructArray::from(b_data);
2826        let a_data = ArrayDataBuilder::new(type_a)
2827            .len(6)
2828            .add_child_data(b.into_data())
2829            .build()
2830            .unwrap();
2831        let a = StructArray::from(a_data);
2832
2833        assert_eq!(a.null_count(), 0);
2834        assert_eq!(a.column(0).null_count(), 0);
2835
2836        // build a racord batch
2837        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2838
2839        roundtrip(batch, Some(SMALL_SIZE / 2));
2840    }
2841
2842    #[test]
2843    fn arrow_writer_2_level_struct_mixed_null() {
2844        // tests writing <struct<struct<primitive>>
2845        let field_c = Field::new("c", DataType::Int32, false);
2846        let type_b = DataType::Struct(vec![field_c].into());
2847        let field_b = Field::new("b", type_b.clone(), true);
2848        let type_a = DataType::Struct(vec![field_b].into());
2849        let field_a = Field::new("a", type_a.clone(), false);
2850        let schema = Schema::new(vec![field_a]);
2851
2852        // create data
2853        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2854        let b_data = ArrayDataBuilder::new(type_b)
2855            .len(6)
2856            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2857            .add_child_data(c.into_data())
2858            .build()
2859            .unwrap();
2860        let b = StructArray::from(b_data);
2861        // a intentionally has no null buffer, to test that this is handled correctly
2862        let a_data = ArrayDataBuilder::new(type_a)
2863            .len(6)
2864            .add_child_data(b.into_data())
2865            .build()
2866            .unwrap();
2867        let a = StructArray::from(a_data);
2868
2869        assert_eq!(a.null_count(), 0);
2870        assert_eq!(a.column(0).null_count(), 2);
2871
2872        // build a racord batch
2873        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2874
2875        roundtrip(batch, Some(SMALL_SIZE / 2));
2876    }
2877
2878    #[test]
2879    fn arrow_writer_2_level_struct_mixed_null_2() {
2880        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2881        let field_c = Field::new("c", DataType::Int32, false);
2882        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2883        let field_e = Field::new(
2884            "e",
2885            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2886            false,
2887        );
2888
2889        let field_b = Field::new(
2890            "b",
2891            DataType::Struct(vec![field_c, field_d, field_e].into()),
2892            false,
2893        );
2894        let type_a = DataType::Struct(vec![field_b.clone()].into());
2895        let field_a = Field::new("a", type_a, true);
2896        let schema = Schema::new(vec![field_a.clone()]);
2897
2898        // create data
2899        let c = Int32Array::from_iter_values(0..6);
2900        let d = FixedSizeBinaryArray::try_from_iter(
2901            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2902        )
2903        .expect("four byte values");
2904        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2905        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2906            .len(6)
2907            .add_child_data(c.into_data())
2908            .add_child_data(d.into_data())
2909            .add_child_data(e.into_data())
2910            .build()
2911            .unwrap();
2912        let b = StructArray::from(b_data);
2913        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2914            .len(6)
2915            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2916            .add_child_data(b.into_data())
2917            .build()
2918            .unwrap();
2919        let a = StructArray::from(a_data);
2920
2921        assert_eq!(a.null_count(), 3);
2922        assert_eq!(a.column(0).null_count(), 0);
2923
2924        // build a record batch
2925        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2926
2927        roundtrip(batch, Some(SMALL_SIZE / 2));
2928    }
2929
2930    #[test]
2931    fn test_fixed_size_binary_in_dict() {
2932        fn test_fixed_size_binary_in_dict_inner<K>()
2933        where
2934            K: ArrowDictionaryKeyType,
2935            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2936            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2937        {
2938            let field = Field::new(
2939                "a",
2940                DataType::Dictionary(
2941                    Box::new(K::DATA_TYPE),
2942                    Box::new(DataType::FixedSizeBinary(4)),
2943                ),
2944                false,
2945            );
2946            let schema = Schema::new(vec![field]);
2947
2948            let keys: Vec<K::Native> = vec![
2949                K::Native::try_from(0u8).unwrap(),
2950                K::Native::try_from(0u8).unwrap(),
2951                K::Native::try_from(1u8).unwrap(),
2952            ];
2953            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2954            let values = FixedSizeBinaryArray::try_from_iter(
2955                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2956            )
2957            .unwrap();
2958
2959            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2960            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2961            roundtrip(batch, None);
2962        }
2963
2964        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2965        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2966        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2967        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2968        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2969        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2970        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2971        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2972    }
2973
2974    #[test]
2975    fn test_empty_dict() {
2976        let struct_fields = Fields::from(vec![Field::new(
2977            "dict",
2978            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2979            false,
2980        )]);
2981
2982        let schema = Schema::new(vec![Field::new_struct(
2983            "struct",
2984            struct_fields.clone(),
2985            true,
2986        )]);
2987        let dictionary = Arc::new(DictionaryArray::new(
2988            Int32Array::new_null(5),
2989            Arc::new(StringArray::new_null(0)),
2990        ));
2991
2992        let s = StructArray::new(
2993            struct_fields,
2994            vec![dictionary],
2995            Some(NullBuffer::new_null(5)),
2996        );
2997
2998        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2999        roundtrip(batch, None);
3000    }
3001    #[test]
3002    fn arrow_writer_page_size() {
3003        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
3004
3005        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
3006
3007        // Generate an array of 10 unique 10 character string
3008        for i in 0..10 {
3009            let value = i
3010                .to_string()
3011                .repeat(10)
3012                .chars()
3013                .take(10)
3014                .collect::<String>();
3015
3016            builder.append_value(value);
3017        }
3018
3019        let array = Arc::new(builder.finish());
3020
3021        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
3022
3023        let file = tempfile::tempfile().unwrap();
3024
3025        // Set everything very low so we fallback to PLAIN encoding after the first row
3026        let props = WriterProperties::builder()
3027            .set_data_page_size_limit(1)
3028            .set_dictionary_page_size_limit(1)
3029            .set_write_batch_size(1)
3030            .build();
3031
3032        let mut writer =
3033            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3034                .expect("Unable to write file");
3035        writer.write(&batch).unwrap();
3036        writer.close().unwrap();
3037
3038        let options = ReadOptionsBuilder::new().with_page_index().build();
3039        let reader =
3040            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
3041
3042        let column = reader.metadata().row_group(0).columns();
3043
3044        assert_eq!(column.len(), 1);
3045
3046        // We should write one row before falling back to PLAIN encoding so there should still be a
3047        // dictionary page.
3048        assert!(
3049            column[0].dictionary_page_offset().is_some(),
3050            "Expected a dictionary page"
3051        );
3052
3053        assert!(reader.metadata().offset_index().is_some());
3054        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
3055
3056        let page_locations = offset_indexes[0].page_locations.clone();
3057
3058        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
3059        // so we expect one dictionary encoded page and then a page per row thereafter.
3060        assert_eq!(
3061            page_locations.len(),
3062            10,
3063            "Expected 10 pages but got {page_locations:#?}"
3064        );
3065    }
3066
3067    #[test]
3068    fn arrow_writer_float_nans() {
3069        let f16_field = Field::new("a", DataType::Float16, false);
3070        let f32_field = Field::new("b", DataType::Float32, false);
3071        let f64_field = Field::new("c", DataType::Float64, false);
3072        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
3073
3074        let f16_values = (0..MEDIUM_SIZE)
3075            .map(|i| {
3076                Some(if i % 2 == 0 {
3077                    f16::NAN
3078                } else {
3079                    f16::from_f32(i as f32)
3080                })
3081            })
3082            .collect::<Float16Array>();
3083
3084        let f32_values = (0..MEDIUM_SIZE)
3085            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
3086            .collect::<Float32Array>();
3087
3088        let f64_values = (0..MEDIUM_SIZE)
3089            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
3090            .collect::<Float64Array>();
3091
3092        let batch = RecordBatch::try_new(
3093            Arc::new(schema),
3094            vec![
3095                Arc::new(f16_values),
3096                Arc::new(f32_values),
3097                Arc::new(f64_values),
3098            ],
3099        )
3100        .unwrap();
3101
3102        roundtrip(batch, None);
3103    }
3104
3105    const SMALL_SIZE: usize = 7;
3106    const MEDIUM_SIZE: usize = 63;
3107
3108    // Write the batch to parquet and read it back out, ensuring
3109    // that what comes out is the same as what was written in
3110    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3111        let mut files = vec![];
3112        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3113            let mut props = WriterProperties::builder().set_writer_version(version);
3114
3115            if let Some(size) = max_row_group_size {
3116                props = props.set_max_row_group_row_count(Some(size))
3117            }
3118
3119            let props = props.build();
3120            files.push(roundtrip_opts(&expected_batch, props))
3121        }
3122        files
3123    }
3124
3125    // Round trip the specified record batch with the specified writer properties,
3126    // to an in-memory file, and validate the arrays using the specified function.
3127    // Returns the in-memory file.
3128    fn roundtrip_opts_with_array_validation<F>(
3129        expected_batch: &RecordBatch,
3130        props: WriterProperties,
3131        validate: F,
3132    ) -> Bytes
3133    where
3134        F: Fn(&ArrayData, &ArrayData),
3135    {
3136        let mut file = vec![];
3137
3138        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3139            .expect("Unable to write file");
3140        writer.write(expected_batch).unwrap();
3141        writer.close().unwrap();
3142
3143        let file = Bytes::from(file);
3144        let mut record_batch_reader =
3145            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3146
3147        let actual_batch = record_batch_reader
3148            .next()
3149            .expect("No batch found")
3150            .expect("Unable to get batch");
3151
3152        assert_eq!(expected_batch.schema(), actual_batch.schema());
3153        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3154        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3155        for i in 0..expected_batch.num_columns() {
3156            let expected_data = expected_batch.column(i).to_data();
3157            let actual_data = actual_batch.column(i).to_data();
3158            validate(&expected_data, &actual_data);
3159        }
3160
3161        file
3162    }
3163
3164    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3165        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3166            a.validate_full().expect("valid expected data");
3167            b.validate_full().expect("valid actual data");
3168            assert_eq!(a, b)
3169        })
3170    }
3171
3172    struct RoundTripOptions {
3173        values: ArrayRef,
3174        schema: SchemaRef,
3175        bloom_filter: bool,
3176        bloom_filter_ndv: Option<u64>,
3177        bloom_filter_position: BloomFilterPosition,
3178    }
3179
3180    impl RoundTripOptions {
3181        fn new(values: ArrayRef, nullable: bool) -> Self {
3182            let data_type = values.data_type().clone();
3183            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3184            Self {
3185                values,
3186                schema: Arc::new(schema),
3187                bloom_filter: false,
3188                bloom_filter_ndv: None,
3189                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3190            }
3191        }
3192    }
3193
3194    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3195        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3196    }
3197
3198    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3199        let mut options = RoundTripOptions::new(values, false);
3200        options.schema = schema;
3201        one_column_roundtrip_with_options(options)
3202    }
3203
3204    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3205        let RoundTripOptions {
3206            values,
3207            schema,
3208            bloom_filter,
3209            bloom_filter_ndv,
3210            bloom_filter_position,
3211        } = options;
3212
3213        let encodings = match values.data_type() {
3214            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3215                vec![
3216                    Encoding::PLAIN,
3217                    Encoding::DELTA_BYTE_ARRAY,
3218                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
3219                ]
3220            }
3221            DataType::Int64
3222            | DataType::Int32
3223            | DataType::Int16
3224            | DataType::Int8
3225            | DataType::UInt64
3226            | DataType::UInt32
3227            | DataType::UInt16
3228            | DataType::UInt8 => vec![
3229                Encoding::PLAIN,
3230                Encoding::DELTA_BINARY_PACKED,
3231                Encoding::BYTE_STREAM_SPLIT,
3232            ],
3233            DataType::Float32 | DataType::Float64 => {
3234                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3235            }
3236            _ => vec![Encoding::PLAIN],
3237        };
3238
3239        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3240
3241        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3242
3243        let mut files = vec![];
3244        for dictionary_size in [0, 1, 1024] {
3245            for encoding in &encodings {
3246                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3247                    for row_group_size in row_group_sizes {
3248                        let mut builder = WriterProperties::builder()
3249                            .set_writer_version(version)
3250                            .set_max_row_group_row_count(Some(row_group_size))
3251                            .set_dictionary_enabled(dictionary_size != 0)
3252                            .set_dictionary_page_size_limit(dictionary_size.max(1))
3253                            .set_encoding(*encoding)
3254                            .set_bloom_filter_enabled(bloom_filter)
3255                            .set_bloom_filter_position(bloom_filter_position);
3256                        if let Some(ndv) = bloom_filter_ndv {
3257                            builder = builder.set_bloom_filter_max_ndv(ndv);
3258                        }
3259                        let props = builder.build();
3260
3261                        files.push(roundtrip_opts(&expected_batch, props))
3262                    }
3263                }
3264            }
3265        }
3266        files
3267    }
3268
3269    fn values_required<A, I>(iter: I) -> Vec<Bytes>
3270    where
3271        A: From<Vec<I::Item>> + Array + 'static,
3272        I: IntoIterator,
3273    {
3274        let raw_values: Vec<_> = iter.into_iter().collect();
3275        let values = Arc::new(A::from(raw_values));
3276        one_column_roundtrip(values, false)
3277    }
3278
3279    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3280    where
3281        A: From<Vec<Option<I::Item>>> + Array + 'static,
3282        I: IntoIterator,
3283    {
3284        let optional_raw_values: Vec<_> = iter
3285            .into_iter()
3286            .enumerate()
3287            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3288            .collect();
3289        let optional_values = Arc::new(A::from(optional_raw_values));
3290        one_column_roundtrip(optional_values, true)
3291    }
3292
3293    fn required_and_optional<A, I>(iter: I)
3294    where
3295        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3296        I: IntoIterator + Clone,
3297    {
3298        values_required::<A, I>(iter.clone());
3299        values_optional::<A, I>(iter);
3300    }
3301
3302    fn check_bloom_filter<T: AsBytes>(
3303        files: Vec<Bytes>,
3304        file_column: String,
3305        positive_values: Vec<T>,
3306        negative_values: Vec<T>,
3307    ) {
3308        files.into_iter().take(1).for_each(|file| {
3309            let file_reader = SerializedFileReader::new_with_options(
3310                file,
3311                ReadOptionsBuilder::new()
3312                    .with_reader_properties(
3313                        ReaderProperties::builder()
3314                            .set_read_bloom_filter(true)
3315                            .build(),
3316                    )
3317                    .build(),
3318            )
3319            .expect("Unable to open file as Parquet");
3320            let metadata = file_reader.metadata();
3321
3322            // Gets bloom filters from all row groups.
3323            let mut bloom_filters: Vec<_> = vec![];
3324            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3325                if let Some((column_index, _)) = row_group
3326                    .columns()
3327                    .iter()
3328                    .enumerate()
3329                    .find(|(_, column)| column.column_path().string() == file_column)
3330                {
3331                    let row_group_reader = file_reader
3332                        .get_row_group(ri)
3333                        .expect("Unable to read row group");
3334                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3335                        bloom_filters.push(sbbf.clone());
3336                    } else {
3337                        panic!("No bloom filter for column named {file_column} found");
3338                    }
3339                } else {
3340                    panic!("No column named {file_column} found");
3341                }
3342            }
3343
3344            positive_values.iter().for_each(|value| {
3345                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3346                assert!(
3347                    found.is_some(),
3348                    "{}",
3349                    format!("Value {:?} should be in bloom filter", value.as_bytes())
3350                );
3351            });
3352
3353            negative_values.iter().for_each(|value| {
3354                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3355                assert!(
3356                    found.is_none(),
3357                    "{}",
3358                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
3359                );
3360            });
3361        });
3362    }
3363
3364    #[test]
3365    fn all_null_primitive_single_column() {
3366        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3367        one_column_roundtrip(values, true);
3368    }
3369    #[test]
3370    fn null_single_column() {
3371        let values = Arc::new(NullArray::new(SMALL_SIZE));
3372        one_column_roundtrip(values, true);
3373        // null arrays are always nullable, a test with non-nullable nulls fails
3374    }
3375
3376    #[test]
3377    fn bool_single_column() {
3378        required_and_optional::<BooleanArray, _>(
3379            [true, false].iter().cycle().copied().take(SMALL_SIZE),
3380        );
3381    }
3382
3383    #[test]
3384    fn bool_large_single_column() {
3385        let values = Arc::new(
3386            [None, Some(true), Some(false)]
3387                .iter()
3388                .cycle()
3389                .copied()
3390                .take(200_000)
3391                .collect::<BooleanArray>(),
3392        );
3393        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3394        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3395        let file = tempfile::tempfile().unwrap();
3396
3397        let mut writer =
3398            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3399                .expect("Unable to write file");
3400        writer.write(&expected_batch).unwrap();
3401        writer.close().unwrap();
3402    }
3403
3404    #[test]
3405    fn check_page_offset_index_with_nan() {
3406        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3407        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3408        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3409
3410        let mut out = Vec::with_capacity(1024);
3411        let mut writer =
3412            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3413        writer.write(&batch).unwrap();
3414        let file_meta_data = writer.close().unwrap();
3415        for row_group in file_meta_data.row_groups() {
3416            for column in row_group.columns() {
3417                assert!(column.offset_index_offset().is_some());
3418                assert!(column.offset_index_length().is_some());
3419                assert!(column.column_index_offset().is_none());
3420                assert!(column.column_index_length().is_none());
3421            }
3422        }
3423    }
3424
3425    #[test]
3426    fn i8_single_column() {
3427        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3428    }
3429
3430    #[test]
3431    fn i16_single_column() {
3432        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3433    }
3434
3435    #[test]
3436    fn i32_single_column() {
3437        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3438    }
3439
3440    #[test]
3441    fn i64_single_column() {
3442        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3443    }
3444
3445    #[test]
3446    fn u8_single_column() {
3447        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3448    }
3449
3450    #[test]
3451    fn u16_single_column() {
3452        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3453    }
3454
3455    #[test]
3456    fn u32_single_column() {
3457        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3458    }
3459
3460    #[test]
3461    fn u64_single_column() {
3462        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3463    }
3464
3465    #[test]
3466    fn f32_single_column() {
3467        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3468    }
3469
3470    #[test]
3471    fn f64_single_column() {
3472        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3473    }
3474
3475    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
3476    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
3477    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
3478
3479    #[test]
3480    fn timestamp_second_single_column() {
3481        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3482        let values = Arc::new(TimestampSecondArray::from(raw_values));
3483
3484        one_column_roundtrip(values, false);
3485    }
3486
3487    #[test]
3488    fn timestamp_millisecond_single_column() {
3489        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3490        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3491
3492        one_column_roundtrip(values, false);
3493    }
3494
3495    #[test]
3496    fn timestamp_microsecond_single_column() {
3497        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3498        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3499
3500        one_column_roundtrip(values, false);
3501    }
3502
3503    #[test]
3504    fn timestamp_nanosecond_single_column() {
3505        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3506        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3507
3508        one_column_roundtrip(values, false);
3509    }
3510
3511    #[test]
3512    fn date32_single_column() {
3513        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3514    }
3515
3516    #[test]
3517    fn date64_single_column() {
3518        // Date64 must be a multiple of 86400000, see ARROW-10925
3519        required_and_optional::<Date64Array, _>(
3520            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3521        );
3522    }
3523
3524    #[test]
3525    fn time32_second_single_column() {
3526        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3527    }
3528
3529    #[test]
3530    fn time32_millisecond_single_column() {
3531        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3532    }
3533
3534    #[test]
3535    fn time64_microsecond_single_column() {
3536        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3537    }
3538
3539    #[test]
3540    fn time64_nanosecond_single_column() {
3541        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3542    }
3543
3544    #[test]
3545    fn duration_second_single_column() {
3546        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3547    }
3548
3549    #[test]
3550    fn duration_millisecond_single_column() {
3551        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3552    }
3553
3554    #[test]
3555    fn duration_microsecond_single_column() {
3556        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3557    }
3558
3559    #[test]
3560    fn duration_nanosecond_single_column() {
3561        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3562    }
3563
3564    #[test]
3565    fn interval_year_month_single_column() {
3566        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3567    }
3568
3569    #[test]
3570    fn interval_day_time_single_column() {
3571        required_and_optional::<IntervalDayTimeArray, _>(vec![
3572            IntervalDayTime::new(0, 1),
3573            IntervalDayTime::new(0, 3),
3574            IntervalDayTime::new(3, -2),
3575            IntervalDayTime::new(-200, 4),
3576        ]);
3577    }
3578
3579    #[test]
3580    #[should_panic(
3581        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3582    )]
3583    fn interval_month_day_nano_single_column() {
3584        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3585            IntervalMonthDayNano::new(0, 1, 5),
3586            IntervalMonthDayNano::new(0, 3, 2),
3587            IntervalMonthDayNano::new(3, -2, -5),
3588            IntervalMonthDayNano::new(-200, 4, -1),
3589        ]);
3590    }
3591
3592    #[test]
3593    fn binary_single_column() {
3594        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3595        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3596        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3597
3598        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3599        values_required::<BinaryArray, _>(many_vecs_iter);
3600    }
3601
3602    #[test]
3603    fn binary_view_single_column() {
3604        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3605        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3606        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3607
3608        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3609        values_required::<BinaryViewArray, _>(many_vecs_iter);
3610    }
3611
3612    #[test]
3613    fn i32_column_bloom_filter_at_end() {
3614        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3615        let mut options = RoundTripOptions::new(array, false);
3616        options.bloom_filter = true;
3617        options.bloom_filter_position = BloomFilterPosition::End;
3618
3619        let files = one_column_roundtrip_with_options(options);
3620        check_bloom_filter(
3621            files,
3622            "col".to_string(),
3623            (0..SMALL_SIZE as i32).collect(),
3624            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3625        );
3626    }
3627
3628    #[test]
3629    fn i32_column_bloom_filter() {
3630        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3631        let mut options = RoundTripOptions::new(array, false);
3632        options.bloom_filter = true;
3633
3634        let files = one_column_roundtrip_with_options(options);
3635        check_bloom_filter(
3636            files,
3637            "col".to_string(),
3638            (0..SMALL_SIZE as i32).collect(),
3639            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3640        );
3641    }
3642
3643    /// Test that bloom filter folding produces correct results even when
3644    /// the configured NDV differs significantly from actual NDV.
3645    /// A large NDV means a larger initial filter that gets folded down;
3646    /// a small NDV means a smaller initial filter.
3647    #[test]
3648    fn i32_column_bloom_filter_fixed_ndv() {
3649        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3650
3651        // NDV much larger than actual distinct values — tests folding a large filter down
3652        let mut options = RoundTripOptions::new(array.clone(), false);
3653        options.bloom_filter = true;
3654        options.bloom_filter_ndv = Some(1_000_000);
3655
3656        let files = one_column_roundtrip_with_options(options);
3657        check_bloom_filter(
3658            files,
3659            "col".to_string(),
3660            (0..SMALL_SIZE as i32).collect(),
3661            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3662        );
3663
3664        // NDV smaller than actual distinct values — tests the underestimate path
3665        let mut options = RoundTripOptions::new(array, false);
3666        options.bloom_filter = true;
3667        options.bloom_filter_ndv = Some(3);
3668
3669        let files = one_column_roundtrip_with_options(options);
3670        check_bloom_filter(
3671            files,
3672            "col".to_string(),
3673            (0..SMALL_SIZE as i32).collect(),
3674            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3675        );
3676    }
3677
3678    #[test]
3679    fn binary_column_bloom_filter() {
3680        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3681        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3682        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3683
3684        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3685        let mut options = RoundTripOptions::new(array, false);
3686        options.bloom_filter = true;
3687
3688        let files = one_column_roundtrip_with_options(options);
3689        check_bloom_filter(
3690            files,
3691            "col".to_string(),
3692            many_vecs,
3693            vec![vec![(SMALL_SIZE + 1) as u8]],
3694        );
3695    }
3696
3697    #[test]
3698    fn empty_string_null_column_bloom_filter() {
3699        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3700        let raw_strs = raw_values.iter().map(|s| s.as_str());
3701
3702        let array = Arc::new(StringArray::from_iter_values(raw_strs));
3703        let mut options = RoundTripOptions::new(array, false);
3704        options.bloom_filter = true;
3705
3706        let files = one_column_roundtrip_with_options(options);
3707
3708        let optional_raw_values: Vec<_> = raw_values
3709            .iter()
3710            .enumerate()
3711            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3712            .collect();
3713        // For null slots, empty string should not be in bloom filter.
3714        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3715    }
3716
3717    #[test]
3718    fn large_binary_single_column() {
3719        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3720        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3721        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3722
3723        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3724        values_required::<LargeBinaryArray, _>(many_vecs_iter);
3725    }
3726
3727    #[test]
3728    fn fixed_size_binary_single_column() {
3729        let mut builder = FixedSizeBinaryBuilder::new(4);
3730        builder.append_value(b"0123").unwrap();
3731        builder.append_null();
3732        builder.append_value(b"8910").unwrap();
3733        builder.append_value(b"1112").unwrap();
3734        let array = Arc::new(builder.finish());
3735
3736        one_column_roundtrip(array, true);
3737    }
3738
3739    #[test]
3740    fn string_single_column() {
3741        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3742        let raw_strs = raw_values.iter().map(|s| s.as_str());
3743
3744        required_and_optional::<StringArray, _>(raw_strs);
3745    }
3746
3747    #[test]
3748    fn large_string_single_column() {
3749        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3750        let raw_strs = raw_values.iter().map(|s| s.as_str());
3751
3752        required_and_optional::<LargeStringArray, _>(raw_strs);
3753    }
3754
3755    #[test]
3756    fn string_view_single_column() {
3757        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3758        let raw_strs = raw_values.iter().map(|s| s.as_str());
3759
3760        required_and_optional::<StringViewArray, _>(raw_strs);
3761    }
3762
3763    #[test]
3764    fn null_list_single_column() {
3765        let null_field = Field::new_list_field(DataType::Null, true);
3766        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3767
3768        let schema = Schema::new(vec![list_field]);
3769
3770        // Build [[], null, [null, null]]
3771        let a_values = NullArray::new(2);
3772        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3773        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3774            DataType::Null,
3775            true,
3776        ))))
3777        .len(3)
3778        .add_buffer(a_value_offsets)
3779        .null_bit_buffer(Some(Buffer::from([0b00000101])))
3780        .add_child_data(a_values.into_data())
3781        .build()
3782        .unwrap();
3783
3784        let a = ListArray::from(a_list_data);
3785
3786        assert!(a.is_valid(0));
3787        assert!(!a.is_valid(1));
3788        assert!(a.is_valid(2));
3789
3790        assert_eq!(a.value(0).len(), 0);
3791        assert_eq!(a.value(2).len(), 2);
3792        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3793
3794        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3795        roundtrip(batch, None);
3796    }
3797
3798    #[test]
3799    fn list_single_column() {
3800        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3801        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3802        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3803            DataType::Int32,
3804            false,
3805        ))))
3806        .len(5)
3807        .add_buffer(a_value_offsets)
3808        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3809        .add_child_data(a_values.into_data())
3810        .build()
3811        .unwrap();
3812
3813        assert_eq!(a_list_data.null_count(), 1);
3814
3815        let a = ListArray::from(a_list_data);
3816        let values = Arc::new(a);
3817
3818        one_column_roundtrip(values, true);
3819    }
3820
3821    #[test]
3822    fn large_list_single_column() {
3823        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3824        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3825        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3826            "large_item",
3827            DataType::Int32,
3828            true,
3829        ))))
3830        .len(5)
3831        .add_buffer(a_value_offsets)
3832        .add_child_data(a_values.into_data())
3833        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3834        .build()
3835        .unwrap();
3836
3837        // I think this setup is incorrect because this should pass
3838        assert_eq!(a_list_data.null_count(), 1);
3839
3840        let a = LargeListArray::from(a_list_data);
3841        let values = Arc::new(a);
3842
3843        one_column_roundtrip(values, true);
3844    }
3845
3846    #[test]
3847    fn list_nested_nulls() {
3848        use arrow::datatypes::Int32Type;
3849        let data = vec![
3850            Some(vec![Some(1)]),
3851            Some(vec![Some(2), Some(3)]),
3852            None,
3853            Some(vec![Some(4), Some(5), None]),
3854            Some(vec![None]),
3855            Some(vec![Some(6), Some(7)]),
3856        ];
3857
3858        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3859        one_column_roundtrip(Arc::new(list), true);
3860
3861        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3862        one_column_roundtrip(Arc::new(list), true);
3863    }
3864
3865    #[test]
3866    fn list_utf8_view_selective_padding_roundtrip() {
3867        let item = Arc::new(Field::new_list_field(DataType::Utf8View, true));
3868        let mut builder = ListBuilder::new(StringViewBuilder::new()).with_field(item);
3869        builder.values().append_value("a");
3870        builder.values().append_null();
3871        builder.append(true);
3872        // The null parent list covers selective padding dropping values below
3873        // the list definition level while preserving the preceding item null.
3874        builder.append(false);
3875        // The long string covers the non-inlined Utf8View buffer path.
3876        builder.values().append_value("large payload over 12 bytes");
3877        builder.append(true);
3878
3879        one_column_roundtrip(Arc::new(builder.finish()), true);
3880    }
3881
3882    #[test]
3883    fn struct_single_column() {
3884        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3885        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3886        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3887
3888        let values = Arc::new(s);
3889        one_column_roundtrip(values, false);
3890    }
3891
3892    #[test]
3893    fn list_and_map_coerced_names() {
3894        // Create map and list with non-Parquet naming
3895        let list_field =
3896            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3897        let map_field = Field::new_map(
3898            "my_map",
3899            "entries",
3900            Field::new("keys", DataType::Int32, false),
3901            Field::new("values", DataType::Int32, true),
3902            false,
3903            true,
3904        );
3905
3906        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3907        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3908
3909        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3910
3911        // Write data to Parquet but coerce names to match spec
3912        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3913        let file = tempfile::tempfile().unwrap();
3914        let mut writer =
3915            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3916
3917        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3918        writer.write(&batch).unwrap();
3919        let file_metadata = writer.close().unwrap();
3920
3921        let schema = file_metadata.file_metadata().schema();
3922        // Coerced name of "item" should be "element"
3923        let list_field = &schema.get_fields()[0].get_fields()[0];
3924        assert_eq!(list_field.get_fields()[0].name(), "element");
3925
3926        let map_field = &schema.get_fields()[1].get_fields()[0];
3927        // Coerced name of "entries" should be "key_value"
3928        assert_eq!(map_field.name(), "key_value");
3929        // Coerced name of "keys" should be "key"
3930        assert_eq!(map_field.get_fields()[0].name(), "key");
3931        // Coerced name of "values" should be "value"
3932        assert_eq!(map_field.get_fields()[1].name(), "value");
3933
3934        // Double check schema after reading from the file
3935        let reader = SerializedFileReader::new(file).unwrap();
3936        let file_schema = reader.metadata().file_metadata().schema();
3937        let fields = file_schema.get_fields();
3938        let list_field = &fields[0].get_fields()[0];
3939        assert_eq!(list_field.get_fields()[0].name(), "element");
3940        let map_field = &fields[1].get_fields()[0];
3941        assert_eq!(map_field.name(), "key_value");
3942        assert_eq!(map_field.get_fields()[0].name(), "key");
3943        assert_eq!(map_field.get_fields()[1].name(), "value");
3944    }
3945
3946    #[test]
3947    fn fallback_flush_data_page() {
3948        //tests if the Fallback::flush_data_page clears all buffers correctly
3949        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3950        let values = Arc::new(StringArray::from(raw_values));
3951        let encodings = vec![
3952            Encoding::DELTA_BYTE_ARRAY,
3953            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3954        ];
3955        let data_type = values.data_type().clone();
3956        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3957        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3958
3959        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3960        let data_page_size_limit: usize = 32;
3961        let write_batch_size: usize = 16;
3962
3963        for encoding in &encodings {
3964            for row_group_size in row_group_sizes {
3965                let props = WriterProperties::builder()
3966                    .set_writer_version(WriterVersion::PARQUET_2_0)
3967                    .set_max_row_group_row_count(Some(row_group_size))
3968                    .set_dictionary_enabled(false)
3969                    .set_encoding(*encoding)
3970                    .set_data_page_size_limit(data_page_size_limit)
3971                    .set_write_batch_size(write_batch_size)
3972                    .build();
3973
3974                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3975                    let string_array_a = StringArray::from(a.clone());
3976                    let string_array_b = StringArray::from(b.clone());
3977                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3978                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3979                    assert_eq!(
3980                        vec_a, vec_b,
3981                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3982                    );
3983                });
3984            }
3985        }
3986    }
3987
3988    #[test]
3989    fn arrow_writer_string_dictionary() {
3990        // define schema
3991        #[allow(deprecated)]
3992        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3993            "dictionary",
3994            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3995            true,
3996            42,
3997            true,
3998        )]));
3999
4000        // create some data
4001        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4002            .iter()
4003            .copied()
4004            .collect();
4005
4006        // build a record batch
4007        one_column_roundtrip_with_schema(Arc::new(d), schema);
4008    }
4009
4010    #[test]
4011    fn arrow_writer_test_type_compatibility() {
4012        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
4013        where
4014            T1: Array + 'static,
4015            T2: Array + 'static,
4016        {
4017            let schema1 = Arc::new(Schema::new(vec![Field::new(
4018                "a",
4019                array1.data_type().clone(),
4020                false,
4021            )]));
4022
4023            let file = tempfile().unwrap();
4024            let mut writer =
4025                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
4026
4027            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
4028            writer.write(&rb1).unwrap();
4029
4030            let schema2 = Arc::new(Schema::new(vec![Field::new(
4031                "a",
4032                array2.data_type().clone(),
4033                false,
4034            )]));
4035            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
4036            writer.write(&rb2).unwrap();
4037
4038            writer.close().unwrap();
4039
4040            let mut record_batch_reader =
4041                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
4042            let actual_batch = record_batch_reader.next().unwrap().unwrap();
4043
4044            let expected_batch =
4045                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
4046            assert_eq!(actual_batch, expected_batch);
4047        }
4048
4049        // check compatibility between native and dictionaries
4050
4051        ensure_compatible_write(
4052            DictionaryArray::new(
4053                UInt8Array::from_iter_values(vec![0]),
4054                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4055            ),
4056            StringArray::from_iter_values(vec!["barquet"]),
4057            DictionaryArray::new(
4058                UInt8Array::from_iter_values(vec![0, 1]),
4059                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4060            ),
4061        );
4062
4063        ensure_compatible_write(
4064            StringArray::from_iter_values(vec!["parquet"]),
4065            DictionaryArray::new(
4066                UInt8Array::from_iter_values(vec![0]),
4067                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4068            ),
4069            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4070        );
4071
4072        // check compatibility between dictionaries with different key types
4073
4074        ensure_compatible_write(
4075            DictionaryArray::new(
4076                UInt8Array::from_iter_values(vec![0]),
4077                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4078            ),
4079            DictionaryArray::new(
4080                UInt16Array::from_iter_values(vec![0]),
4081                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4082            ),
4083            DictionaryArray::new(
4084                UInt8Array::from_iter_values(vec![0, 1]),
4085                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4086            ),
4087        );
4088
4089        // check compatibility between dictionaries with different value types
4090        ensure_compatible_write(
4091            DictionaryArray::new(
4092                UInt8Array::from_iter_values(vec![0]),
4093                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4094            ),
4095            DictionaryArray::new(
4096                UInt8Array::from_iter_values(vec![0]),
4097                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
4098            ),
4099            DictionaryArray::new(
4100                UInt8Array::from_iter_values(vec![0, 1]),
4101                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4102            ),
4103        );
4104
4105        // check compatibility between a dictionary and a native array with a different type
4106        ensure_compatible_write(
4107            DictionaryArray::new(
4108                UInt8Array::from_iter_values(vec![0]),
4109                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4110            ),
4111            LargeStringArray::from_iter_values(vec!["barquet"]),
4112            DictionaryArray::new(
4113                UInt8Array::from_iter_values(vec![0, 1]),
4114                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4115            ),
4116        );
4117
4118        // check compatibility for string types
4119
4120        ensure_compatible_write(
4121            StringArray::from_iter_values(vec!["parquet"]),
4122            LargeStringArray::from_iter_values(vec!["barquet"]),
4123            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4124        );
4125
4126        ensure_compatible_write(
4127            LargeStringArray::from_iter_values(vec!["parquet"]),
4128            StringArray::from_iter_values(vec!["barquet"]),
4129            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4130        );
4131
4132        ensure_compatible_write(
4133            StringArray::from_iter_values(vec!["parquet"]),
4134            StringViewArray::from_iter_values(vec!["barquet"]),
4135            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4136        );
4137
4138        ensure_compatible_write(
4139            StringViewArray::from_iter_values(vec!["parquet"]),
4140            StringArray::from_iter_values(vec!["barquet"]),
4141            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4142        );
4143
4144        ensure_compatible_write(
4145            LargeStringArray::from_iter_values(vec!["parquet"]),
4146            StringViewArray::from_iter_values(vec!["barquet"]),
4147            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4148        );
4149
4150        ensure_compatible_write(
4151            StringViewArray::from_iter_values(vec!["parquet"]),
4152            LargeStringArray::from_iter_values(vec!["barquet"]),
4153            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4154        );
4155
4156        // check compatibility for binary types
4157
4158        ensure_compatible_write(
4159            BinaryArray::from_iter_values(vec![b"parquet"]),
4160            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4161            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4162        );
4163
4164        ensure_compatible_write(
4165            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4166            BinaryArray::from_iter_values(vec![b"barquet"]),
4167            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4168        );
4169
4170        ensure_compatible_write(
4171            BinaryArray::from_iter_values(vec![b"parquet"]),
4172            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4173            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4174        );
4175
4176        ensure_compatible_write(
4177            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4178            BinaryArray::from_iter_values(vec![b"barquet"]),
4179            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4180        );
4181
4182        ensure_compatible_write(
4183            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4184            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4185            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4186        );
4187
4188        ensure_compatible_write(
4189            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4190            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4191            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4192        );
4193
4194        // check compatibility for list types
4195
4196        let list_field_metadata = HashMap::from_iter(vec![(
4197            PARQUET_FIELD_ID_META_KEY.to_string(),
4198            "1".to_string(),
4199        )]);
4200        let list_field = Field::new_list_field(DataType::Int32, false);
4201
4202        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4203        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4204
4205        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4206        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4207
4208        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4209        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4210
4211        ensure_compatible_write(
4212            // when the initial schema has the metadata ...
4213            ListArray::try_new(
4214                Arc::new(
4215                    list_field
4216                        .clone()
4217                        .with_metadata(list_field_metadata.clone()),
4218                ),
4219                offsets1,
4220                values1,
4221                None,
4222            )
4223            .unwrap(),
4224            // ... and some intermediate schema doesn't have the metadata
4225            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4226            // ... the write will still go through, and the resulting schema will inherit the initial metadata
4227            ListArray::try_new(
4228                Arc::new(
4229                    list_field
4230                        .clone()
4231                        .with_metadata(list_field_metadata.clone()),
4232                ),
4233                offsets_expected,
4234                values_expected,
4235                None,
4236            )
4237            .unwrap(),
4238        );
4239    }
4240
4241    #[test]
4242    fn arrow_writer_primitive_dictionary() {
4243        // define schema
4244        #[allow(deprecated)]
4245        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4246            "dictionary",
4247            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4248            true,
4249            42,
4250            true,
4251        )]));
4252
4253        // create some data
4254        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4255        builder.append(12345678).unwrap();
4256        builder.append_null();
4257        builder.append(22345678).unwrap();
4258        builder.append(12345678).unwrap();
4259        let d = builder.finish();
4260
4261        one_column_roundtrip_with_schema(Arc::new(d), schema);
4262    }
4263
4264    #[test]
4265    fn arrow_writer_decimal32_dictionary() {
4266        let integers = vec![12345, 56789, 34567];
4267
4268        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4269
4270        let values = Decimal32Array::from(integers.clone())
4271            .with_precision_and_scale(5, 2)
4272            .unwrap();
4273
4274        let array = DictionaryArray::new(keys, Arc::new(values));
4275        one_column_roundtrip(Arc::new(array.clone()), true);
4276
4277        let values = Decimal32Array::from(integers)
4278            .with_precision_and_scale(9, 2)
4279            .unwrap();
4280
4281        let array = array.with_values(Arc::new(values));
4282        one_column_roundtrip(Arc::new(array), true);
4283    }
4284
4285    #[test]
4286    fn arrow_writer_decimal64_dictionary() {
4287        let integers = vec![12345, 56789, 34567];
4288
4289        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4290
4291        let values = Decimal64Array::from(integers.clone())
4292            .with_precision_and_scale(5, 2)
4293            .unwrap();
4294
4295        let array = DictionaryArray::new(keys, Arc::new(values));
4296        one_column_roundtrip(Arc::new(array.clone()), true);
4297
4298        let values = Decimal64Array::from(integers)
4299            .with_precision_and_scale(12, 2)
4300            .unwrap();
4301
4302        let array = array.with_values(Arc::new(values));
4303        one_column_roundtrip(Arc::new(array), true);
4304    }
4305
4306    #[test]
4307    fn arrow_writer_decimal128_dictionary() {
4308        let integers = vec![12345, 56789, 34567];
4309
4310        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4311
4312        let values = Decimal128Array::from(integers.clone())
4313            .with_precision_and_scale(5, 2)
4314            .unwrap();
4315
4316        let array = DictionaryArray::new(keys, Arc::new(values));
4317        one_column_roundtrip(Arc::new(array.clone()), true);
4318
4319        let values = Decimal128Array::from(integers)
4320            .with_precision_and_scale(12, 2)
4321            .unwrap();
4322
4323        let array = array.with_values(Arc::new(values));
4324        one_column_roundtrip(Arc::new(array), true);
4325    }
4326
4327    #[test]
4328    fn arrow_writer_decimal256_dictionary() {
4329        let integers = vec![
4330            i256::from_i128(12345),
4331            i256::from_i128(56789),
4332            i256::from_i128(34567),
4333        ];
4334
4335        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4336
4337        let values = Decimal256Array::from(integers.clone())
4338            .with_precision_and_scale(5, 2)
4339            .unwrap();
4340
4341        let array = DictionaryArray::new(keys, Arc::new(values));
4342        one_column_roundtrip(Arc::new(array.clone()), true);
4343
4344        let values = Decimal256Array::from(integers)
4345            .with_precision_and_scale(12, 2)
4346            .unwrap();
4347
4348        let array = array.with_values(Arc::new(values));
4349        one_column_roundtrip(Arc::new(array), true);
4350    }
4351
4352    #[test]
4353    fn arrow_writer_string_dictionary_unsigned_index() {
4354        // define schema
4355        #[allow(deprecated)]
4356        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4357            "dictionary",
4358            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4359            true,
4360            42,
4361            true,
4362        )]));
4363
4364        // create some data
4365        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4366            .iter()
4367            .copied()
4368            .collect();
4369
4370        one_column_roundtrip_with_schema(Arc::new(d), schema);
4371    }
4372
4373    #[test]
4374    fn u32_min_max() {
4375        // check values roundtrip through parquet
4376        let src = [
4377            u32::MIN,
4378            1,
4379            (i32::MAX as u32) - 1,
4380            i32::MAX as u32,
4381            (i32::MAX as u32) + 1,
4382            u32::MAX - 1,
4383            u32::MAX,
4384        ];
4385        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4386        let files = one_column_roundtrip(values, false);
4387
4388        for file in files {
4389            // check statistics are valid
4390            let reader = SerializedFileReader::new(file).unwrap();
4391            let metadata = reader.metadata();
4392
4393            let mut row_offset = 0;
4394            for row_group in metadata.row_groups() {
4395                assert_eq!(row_group.num_columns(), 1);
4396                let column = row_group.column(0);
4397
4398                let num_values = column.num_values() as usize;
4399                let src_slice = &src[row_offset..row_offset + num_values];
4400                row_offset += column.num_values() as usize;
4401
4402                let stats = column.statistics().unwrap();
4403                if let Statistics::Int32(stats) = stats {
4404                    assert_eq!(
4405                        *stats.min_opt().unwrap() as u32,
4406                        *src_slice.iter().min().unwrap()
4407                    );
4408                    assert_eq!(
4409                        *stats.max_opt().unwrap() as u32,
4410                        *src_slice.iter().max().unwrap()
4411                    );
4412                } else {
4413                    panic!("Statistics::Int32 missing")
4414                }
4415            }
4416        }
4417    }
4418
4419    #[test]
4420    fn u64_min_max() {
4421        // check values roundtrip through parquet
4422        let src = [
4423            u64::MIN,
4424            1,
4425            (i64::MAX as u64) - 1,
4426            i64::MAX as u64,
4427            (i64::MAX as u64) + 1,
4428            u64::MAX - 1,
4429            u64::MAX,
4430        ];
4431        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4432        let files = one_column_roundtrip(values, false);
4433
4434        for file in files {
4435            // check statistics are valid
4436            let reader = SerializedFileReader::new(file).unwrap();
4437            let metadata = reader.metadata();
4438
4439            let mut row_offset = 0;
4440            for row_group in metadata.row_groups() {
4441                assert_eq!(row_group.num_columns(), 1);
4442                let column = row_group.column(0);
4443
4444                let num_values = column.num_values() as usize;
4445                let src_slice = &src[row_offset..row_offset + num_values];
4446                row_offset += column.num_values() as usize;
4447
4448                let stats = column.statistics().unwrap();
4449                if let Statistics::Int64(stats) = stats {
4450                    assert_eq!(
4451                        *stats.min_opt().unwrap() as u64,
4452                        *src_slice.iter().min().unwrap()
4453                    );
4454                    assert_eq!(
4455                        *stats.max_opt().unwrap() as u64,
4456                        *src_slice.iter().max().unwrap()
4457                    );
4458                } else {
4459                    panic!("Statistics::Int64 missing")
4460                }
4461            }
4462        }
4463    }
4464
4465    #[test]
4466    fn statistics_null_counts_only_nulls() {
4467        // check that null-count statistics for "only NULL"-columns are correct
4468        let values = Arc::new(UInt64Array::from(vec![None, None]));
4469        let files = one_column_roundtrip(values, true);
4470
4471        for file in files {
4472            // check statistics are valid
4473            let reader = SerializedFileReader::new(file).unwrap();
4474            let metadata = reader.metadata();
4475            assert_eq!(metadata.num_row_groups(), 1);
4476            let row_group = metadata.row_group(0);
4477            assert_eq!(row_group.num_columns(), 1);
4478            let column = row_group.column(0);
4479            let stats = column.statistics().unwrap();
4480            assert_eq!(stats.null_count_opt(), Some(2));
4481        }
4482    }
4483
4484    #[test]
4485    fn test_list_of_struct_roundtrip() {
4486        // define schema
4487        let int_field = Field::new("a", DataType::Int32, true);
4488        let int_field2 = Field::new("b", DataType::Int32, true);
4489
4490        let int_builder = Int32Builder::with_capacity(10);
4491        let int_builder2 = Int32Builder::with_capacity(10);
4492
4493        let struct_builder = StructBuilder::new(
4494            vec![int_field, int_field2],
4495            vec![Box::new(int_builder), Box::new(int_builder2)],
4496        );
4497        let mut list_builder = ListBuilder::new(struct_builder);
4498
4499        // Construct the following array
4500        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
4501
4502        // [{a: 1, b: 2}]
4503        let values = list_builder.values();
4504        values
4505            .field_builder::<Int32Builder>(0)
4506            .unwrap()
4507            .append_value(1);
4508        values
4509            .field_builder::<Int32Builder>(1)
4510            .unwrap()
4511            .append_value(2);
4512        values.append(true);
4513        list_builder.append(true);
4514
4515        // []
4516        list_builder.append(true);
4517
4518        // null
4519        list_builder.append(false);
4520
4521        // [null, null]
4522        let values = list_builder.values();
4523        values
4524            .field_builder::<Int32Builder>(0)
4525            .unwrap()
4526            .append_null();
4527        values
4528            .field_builder::<Int32Builder>(1)
4529            .unwrap()
4530            .append_null();
4531        values.append(false);
4532        values
4533            .field_builder::<Int32Builder>(0)
4534            .unwrap()
4535            .append_null();
4536        values
4537            .field_builder::<Int32Builder>(1)
4538            .unwrap()
4539            .append_null();
4540        values.append(false);
4541        list_builder.append(true);
4542
4543        // [{a: null, b: 3}]
4544        let values = list_builder.values();
4545        values
4546            .field_builder::<Int32Builder>(0)
4547            .unwrap()
4548            .append_null();
4549        values
4550            .field_builder::<Int32Builder>(1)
4551            .unwrap()
4552            .append_value(3);
4553        values.append(true);
4554        list_builder.append(true);
4555
4556        // [{a: 2, b: null}]
4557        let values = list_builder.values();
4558        values
4559            .field_builder::<Int32Builder>(0)
4560            .unwrap()
4561            .append_value(2);
4562        values
4563            .field_builder::<Int32Builder>(1)
4564            .unwrap()
4565            .append_null();
4566        values.append(true);
4567        list_builder.append(true);
4568
4569        let array = Arc::new(list_builder.finish());
4570
4571        one_column_roundtrip(array, true);
4572    }
4573
4574    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4575        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4576    }
4577
4578    #[test]
4579    fn test_aggregates_records() {
4580        let arrays = [
4581            Int32Array::from((0..100).collect::<Vec<_>>()),
4582            Int32Array::from((0..50).collect::<Vec<_>>()),
4583            Int32Array::from((200..500).collect::<Vec<_>>()),
4584        ];
4585
4586        let schema = Arc::new(Schema::new(vec![Field::new(
4587            "int",
4588            ArrowDataType::Int32,
4589            false,
4590        )]));
4591
4592        let file = tempfile::tempfile().unwrap();
4593
4594        let props = WriterProperties::builder()
4595            .set_max_row_group_row_count(Some(200))
4596            .build();
4597
4598        let mut writer =
4599            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4600
4601        for array in arrays {
4602            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4603            writer.write(&batch).unwrap();
4604        }
4605
4606        writer.close().unwrap();
4607
4608        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4609        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4610
4611        let batches = builder
4612            .with_batch_size(100)
4613            .build()
4614            .unwrap()
4615            .collect::<ArrowResult<Vec<_>>>()
4616            .unwrap();
4617
4618        assert_eq!(batches.len(), 5);
4619        assert!(batches.iter().all(|x| x.num_columns() == 1));
4620
4621        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4622
4623        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4624
4625        let values: Vec<_> = batches
4626            .iter()
4627            .flat_map(|x| {
4628                x.column(0)
4629                    .as_any()
4630                    .downcast_ref::<Int32Array>()
4631                    .unwrap()
4632                    .values()
4633                    .iter()
4634                    .cloned()
4635            })
4636            .collect();
4637
4638        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4639        assert_eq!(&values, &expected_values)
4640    }
4641
4642    #[test]
4643    fn complex_aggregate() {
4644        // Tests aggregating nested data
4645        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4646        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4647        let struct_a = Arc::new(Field::new(
4648            "struct_a",
4649            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4650            true,
4651        ));
4652
4653        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4654        let struct_b = Arc::new(Field::new(
4655            "struct_b",
4656            DataType::Struct(vec![list_a.clone()].into()),
4657            false,
4658        ));
4659
4660        let schema = Arc::new(Schema::new(vec![struct_b]));
4661
4662        // create nested data
4663        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4664        let field_b_array =
4665            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4666
4667        let struct_a_array = StructArray::from(vec![
4668            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4669            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4670        ]);
4671
4672        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4673            .len(5)
4674            .add_buffer(Buffer::from_iter(vec![
4675                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4676            ]))
4677            .null_bit_buffer(Some(Buffer::from_iter(vec![
4678                true, false, true, false, true,
4679            ])))
4680            .child_data(vec![struct_a_array.into_data()])
4681            .build()
4682            .unwrap();
4683
4684        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4685        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4686
4687        let batch1 =
4688            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4689                .unwrap();
4690
4691        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4692        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4693
4694        let struct_a_array = StructArray::from(vec![
4695            (field_a, Arc::new(field_a_array) as ArrayRef),
4696            (field_b, Arc::new(field_b_array) as ArrayRef),
4697        ]);
4698
4699        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4700            .len(2)
4701            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4702            .child_data(vec![struct_a_array.into_data()])
4703            .build()
4704            .unwrap();
4705
4706        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4707        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4708
4709        let batch2 =
4710            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4711                .unwrap();
4712
4713        let batches = &[batch1, batch2];
4714
4715        // Verify data is as expected
4716
4717        let expected = r#"
4718            +-------------------------------------------------------------------------------------------------------+
4719            | struct_b                                                                                              |
4720            +-------------------------------------------------------------------------------------------------------+
4721            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
4722            | {list: }                                                                                              |
4723            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
4724            | {list: }                                                                                              |
4725            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
4726            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4727            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
4728            +-------------------------------------------------------------------------------------------------------+
4729        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4730
4731        let actual = pretty_format_batches(batches).unwrap().to_string();
4732        assert_eq!(actual, expected);
4733
4734        // Write data
4735        let file = tempfile::tempfile().unwrap();
4736        let props = WriterProperties::builder()
4737            .set_max_row_group_row_count(Some(6))
4738            .build();
4739
4740        let mut writer =
4741            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4742
4743        for batch in batches {
4744            writer.write(batch).unwrap();
4745        }
4746        writer.close().unwrap();
4747
4748        // Read Data
4749        // Should have written entire first batch and first row of second to the first row group
4750        // leaving a single row in the second row group
4751
4752        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4753        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4754
4755        let batches = builder
4756            .with_batch_size(2)
4757            .build()
4758            .unwrap()
4759            .collect::<ArrowResult<Vec<_>>>()
4760            .unwrap();
4761
4762        assert_eq!(batches.len(), 4);
4763        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4764        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4765
4766        let actual = pretty_format_batches(&batches).unwrap().to_string();
4767        assert_eq!(actual, expected);
4768    }
4769
4770    #[test]
4771    fn test_arrow_writer_metadata() {
4772        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4773        let file_schema = batch_schema.clone().with_metadata(
4774            vec![("foo".to_string(), "bar".to_string())]
4775                .into_iter()
4776                .collect(),
4777        );
4778
4779        let batch = RecordBatch::try_new(
4780            Arc::new(batch_schema),
4781            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4782        )
4783        .unwrap();
4784
4785        let mut buf = Vec::with_capacity(1024);
4786        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4787        writer.write(&batch).unwrap();
4788        writer.close().unwrap();
4789    }
4790
4791    #[test]
4792    fn test_arrow_writer_nullable() {
4793        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4794        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4795        let file_schema = Arc::new(file_schema);
4796
4797        let batch = RecordBatch::try_new(
4798            Arc::new(batch_schema),
4799            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4800        )
4801        .unwrap();
4802
4803        let mut buf = Vec::with_capacity(1024);
4804        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4805        writer.write(&batch).unwrap();
4806        writer.close().unwrap();
4807
4808        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4809        let back = read.next().unwrap().unwrap();
4810        assert_eq!(back.schema(), file_schema);
4811        assert_ne!(back.schema(), batch.schema());
4812        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4813    }
4814
4815    #[test]
4816    fn in_progress_accounting() {
4817        // define schema
4818        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4819
4820        // create some data
4821        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4822
4823        // build a record batch
4824        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4825
4826        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4827
4828        // starts empty
4829        assert_eq!(writer.in_progress_size(), 0);
4830        assert_eq!(writer.in_progress_rows(), 0);
4831        assert_eq!(writer.memory_size(), 0);
4832        assert_eq!(writer.bytes_written(), 4); // Initial header
4833        writer.write(&batch).unwrap();
4834
4835        // updated on write
4836        let initial_size = writer.in_progress_size();
4837        assert!(initial_size > 0);
4838        assert_eq!(writer.in_progress_rows(), 5);
4839        let initial_memory = writer.memory_size();
4840        assert!(initial_memory > 0);
4841        // memory estimate is larger than estimated encoded size
4842        assert!(
4843            initial_size <= initial_memory,
4844            "{initial_size} <= {initial_memory}"
4845        );
4846
4847        // updated on second write
4848        writer.write(&batch).unwrap();
4849        assert!(writer.in_progress_size() > initial_size);
4850        assert_eq!(writer.in_progress_rows(), 10);
4851        assert!(writer.memory_size() > initial_memory);
4852        assert!(
4853            writer.in_progress_size() <= writer.memory_size(),
4854            "in_progress_size {} <= memory_size {}",
4855            writer.in_progress_size(),
4856            writer.memory_size()
4857        );
4858
4859        // in progress tracking is cleared, but the overall data written is updated
4860        let pre_flush_bytes_written = writer.bytes_written();
4861        writer.flush().unwrap();
4862        assert_eq!(writer.in_progress_size(), 0);
4863        assert_eq!(writer.memory_size(), 0);
4864        assert!(writer.bytes_written() > pre_flush_bytes_written);
4865
4866        writer.close().unwrap();
4867    }
4868
4869    #[test]
4870    fn test_writer_all_null() {
4871        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4872        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4873        let batch = RecordBatch::try_from_iter(vec![
4874            ("a", Arc::new(a) as ArrayRef),
4875            ("b", Arc::new(b) as ArrayRef),
4876        ])
4877        .unwrap();
4878
4879        let mut buf = Vec::with_capacity(1024);
4880        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4881        writer.write(&batch).unwrap();
4882        writer.close().unwrap();
4883
4884        let bytes = Bytes::from(buf);
4885        let options = ReadOptionsBuilder::new().with_page_index().build();
4886        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4887        let index = reader.metadata().offset_index().unwrap();
4888
4889        assert_eq!(index.len(), 1);
4890        assert_eq!(index[0].len(), 2); // 2 columns
4891        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4892        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4893    }
4894
4895    #[test]
4896    fn test_disabled_statistics_with_page() {
4897        let file_schema = Schema::new(vec![
4898            Field::new("a", DataType::Utf8, true),
4899            Field::new("b", DataType::Utf8, true),
4900        ]);
4901        let file_schema = Arc::new(file_schema);
4902
4903        let batch = RecordBatch::try_new(
4904            file_schema.clone(),
4905            vec![
4906                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4907                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4908            ],
4909        )
4910        .unwrap();
4911
4912        let props = WriterProperties::builder()
4913            .set_statistics_enabled(EnabledStatistics::None)
4914            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4915            .build();
4916
4917        let mut buf = Vec::with_capacity(1024);
4918        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4919        writer.write(&batch).unwrap();
4920
4921        let metadata = writer.close().unwrap();
4922        assert_eq!(metadata.num_row_groups(), 1);
4923        let row_group = metadata.row_group(0);
4924        assert_eq!(row_group.num_columns(), 2);
4925        // Column "a" has both offset and column index, as requested
4926        assert!(row_group.column(0).offset_index_offset().is_some());
4927        assert!(row_group.column(0).column_index_offset().is_some());
4928        // Column "b" should only have offset index
4929        assert!(row_group.column(1).offset_index_offset().is_some());
4930        assert!(row_group.column(1).column_index_offset().is_none());
4931
4932        let options = ReadOptionsBuilder::new().with_page_index().build();
4933        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4934
4935        let row_group = reader.get_row_group(0).unwrap();
4936        let a_col = row_group.metadata().column(0);
4937        let b_col = row_group.metadata().column(1);
4938
4939        // Column chunk of column "a" should have chunk level statistics
4940        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4941            let min = byte_array_stats.min_opt().unwrap();
4942            let max = byte_array_stats.max_opt().unwrap();
4943
4944            assert_eq!(min.as_bytes(), b"a");
4945            assert_eq!(max.as_bytes(), b"d");
4946        } else {
4947            panic!("expecting Statistics::ByteArray");
4948        }
4949
4950        // The column chunk for column "b" shouldn't have statistics
4951        assert!(b_col.statistics().is_none());
4952
4953        let offset_index = reader.metadata().offset_index().unwrap();
4954        assert_eq!(offset_index.len(), 1); // 1 row group
4955        assert_eq!(offset_index[0].len(), 2); // 2 columns
4956
4957        let column_index = reader.metadata().column_index().unwrap();
4958        assert_eq!(column_index.len(), 1); // 1 row group
4959        assert_eq!(column_index[0].len(), 2); // 2 columns
4960
4961        let a_idx = &column_index[0][0];
4962        assert!(
4963            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4964            "{a_idx:?}"
4965        );
4966        let b_idx = &column_index[0][1];
4967        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4968    }
4969
4970    #[test]
4971    fn test_disabled_statistics_with_chunk() {
4972        let file_schema = Schema::new(vec![
4973            Field::new("a", DataType::Utf8, true),
4974            Field::new("b", DataType::Utf8, true),
4975        ]);
4976        let file_schema = Arc::new(file_schema);
4977
4978        let batch = RecordBatch::try_new(
4979            file_schema.clone(),
4980            vec![
4981                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4982                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4983            ],
4984        )
4985        .unwrap();
4986
4987        let props = WriterProperties::builder()
4988            .set_statistics_enabled(EnabledStatistics::None)
4989            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4990            .build();
4991
4992        let mut buf = Vec::with_capacity(1024);
4993        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4994        writer.write(&batch).unwrap();
4995
4996        let metadata = writer.close().unwrap();
4997        assert_eq!(metadata.num_row_groups(), 1);
4998        let row_group = metadata.row_group(0);
4999        assert_eq!(row_group.num_columns(), 2);
5000        // Column "a" should only have offset index
5001        assert!(row_group.column(0).offset_index_offset().is_some());
5002        assert!(row_group.column(0).column_index_offset().is_none());
5003        // Column "b" should only have offset index
5004        assert!(row_group.column(1).offset_index_offset().is_some());
5005        assert!(row_group.column(1).column_index_offset().is_none());
5006
5007        let options = ReadOptionsBuilder::new().with_page_index().build();
5008        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
5009
5010        let row_group = reader.get_row_group(0).unwrap();
5011        let a_col = row_group.metadata().column(0);
5012        let b_col = row_group.metadata().column(1);
5013
5014        // Column chunk of column "a" should have chunk level statistics
5015        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
5016            let min = byte_array_stats.min_opt().unwrap();
5017            let max = byte_array_stats.max_opt().unwrap();
5018
5019            assert_eq!(min.as_bytes(), b"a");
5020            assert_eq!(max.as_bytes(), b"d");
5021        } else {
5022            panic!("expecting Statistics::ByteArray");
5023        }
5024
5025        // The column chunk for column "b"  shouldn't have statistics
5026        assert!(b_col.statistics().is_none());
5027
5028        let column_index = reader.metadata().column_index().unwrap();
5029        assert_eq!(column_index.len(), 1); // 1 row group
5030        assert_eq!(column_index[0].len(), 2); // 2 columns
5031
5032        let a_idx = &column_index[0][0];
5033        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
5034        let b_idx = &column_index[0][1];
5035        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
5036    }
5037
5038    #[test]
5039    fn test_arrow_writer_skip_metadata() {
5040        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5041        let file_schema = Arc::new(batch_schema.clone());
5042
5043        let batch = RecordBatch::try_new(
5044            Arc::new(batch_schema),
5045            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5046        )
5047        .unwrap();
5048        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
5049
5050        let mut buf = Vec::with_capacity(1024);
5051        let mut writer =
5052            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5053        writer.write(&batch).unwrap();
5054        writer.close().unwrap();
5055
5056        let bytes = Bytes::from(buf);
5057        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5058        assert_eq!(file_schema, *reader_builder.schema());
5059        if let Some(key_value_metadata) = reader_builder
5060            .metadata()
5061            .file_metadata()
5062            .key_value_metadata()
5063        {
5064            assert!(
5065                !key_value_metadata
5066                    .iter()
5067                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
5068            );
5069        }
5070    }
5071
5072    #[test]
5073    fn test_arrow_writer_skip_path_in_schema() {
5074        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5075        let file_schema = Arc::new(batch_schema.clone());
5076
5077        let batch = RecordBatch::try_new(
5078            Arc::new(batch_schema),
5079            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5080        )
5081        .unwrap();
5082
5083        // default options should still write path_in_schema
5084        let skip_options = ArrowWriterOptions::new();
5085
5086        let mut buf = Vec::with_capacity(1024);
5087        let mut writer =
5088            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5089        writer.write(&batch).unwrap();
5090        writer.close().unwrap();
5091
5092        // override to not write path_in_schema
5093        let skip_options = ArrowWriterOptions::new().with_properties(
5094            WriterProperties::builder()
5095                .set_write_path_in_schema(false)
5096                .build(),
5097        );
5098
5099        let mut buf2 = Vec::with_capacity(1024);
5100        let mut writer =
5101            ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
5102                .unwrap();
5103        writer.write(&batch).unwrap();
5104        writer.close().unwrap();
5105
5106        // buf2 should be a bit smaller due to lack of path_in_schema
5107        assert!(buf.len() > buf2.len());
5108    }
5109
5110    #[test]
5111    fn mismatched_schemas() {
5112        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
5113        let file_schema = Arc::new(Schema::new(vec![Field::new(
5114            "temperature",
5115            DataType::Float64,
5116            false,
5117        )]));
5118
5119        let batch = RecordBatch::try_new(
5120            Arc::new(batch_schema),
5121            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5122        )
5123        .unwrap();
5124
5125        let mut buf = Vec::with_capacity(1024);
5126        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5127
5128        let err = writer.write(&batch).unwrap_err().to_string();
5129        assert_eq!(
5130            err,
5131            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5132        );
5133    }
5134
5135    #[test]
5136    // https://github.com/apache/arrow-rs/issues/6988
5137    fn test_roundtrip_empty_schema() {
5138        // create empty record batch with empty schema
5139        let empty_batch = RecordBatch::try_new_with_options(
5140            Arc::new(Schema::empty()),
5141            vec![],
5142            &RecordBatchOptions::default().with_row_count(Some(0)),
5143        )
5144        .unwrap();
5145
5146        // write to parquet
5147        let mut parquet_bytes: Vec<u8> = Vec::new();
5148        let mut writer =
5149            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5150        writer.write(&empty_batch).unwrap();
5151        writer.close().unwrap();
5152
5153        // read from parquet
5154        let bytes = Bytes::from(parquet_bytes);
5155        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5156        assert_eq!(reader.schema(), &empty_batch.schema());
5157        let batches: Vec<_> = reader
5158            .build()
5159            .unwrap()
5160            .collect::<ArrowResult<Vec<_>>>()
5161            .unwrap();
5162        assert_eq!(batches.len(), 0);
5163    }
5164
5165    #[test]
5166    fn test_page_stats_not_written_by_default() {
5167        let string_field = Field::new("a", DataType::Utf8, false);
5168        let schema = Schema::new(vec![string_field]);
5169        let raw_string_values = vec!["Blart Versenwald III"];
5170        let string_values = StringArray::from(raw_string_values.clone());
5171        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5172
5173        let props = WriterProperties::builder()
5174            .set_statistics_enabled(EnabledStatistics::Page)
5175            .set_dictionary_enabled(false)
5176            .set_encoding(Encoding::PLAIN)
5177            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5178            .build();
5179
5180        let file = roundtrip_opts(&batch, props);
5181
5182        // read file and decode page headers
5183        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5184
5185        // decode first page header
5186        let first_page = &file[4..];
5187        let mut prot = ThriftSliceInputProtocol::new(first_page);
5188        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5189        let stats = hdr.data_page_header.unwrap().statistics;
5190
5191        assert!(stats.is_none());
5192    }
5193
5194    #[test]
5195    fn test_page_stats_when_enabled() {
5196        let string_field = Field::new("a", DataType::Utf8, false);
5197        let schema = Schema::new(vec![string_field]);
5198        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5199        let string_values = StringArray::from(raw_string_values.clone());
5200        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5201
5202        let props = WriterProperties::builder()
5203            .set_statistics_enabled(EnabledStatistics::Page)
5204            .set_dictionary_enabled(false)
5205            .set_encoding(Encoding::PLAIN)
5206            .set_write_page_header_statistics(true)
5207            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5208            .build();
5209
5210        let file = roundtrip_opts(&batch, props);
5211
5212        // read file and decode page headers
5213        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5214
5215        // decode first page header
5216        let first_page = &file[4..];
5217        let mut prot = ThriftSliceInputProtocol::new(first_page);
5218        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5219        let stats = hdr.data_page_header.unwrap().statistics;
5220
5221        let stats = stats.unwrap();
5222        // check that min/max were actually written to the page
5223        assert!(stats.is_max_value_exact.unwrap());
5224        assert!(stats.is_min_value_exact.unwrap());
5225        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5226        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5227    }
5228
5229    #[test]
5230    fn test_page_stats_truncation() {
5231        let string_field = Field::new("a", DataType::Utf8, false);
5232        let binary_field = Field::new("b", DataType::Binary, false);
5233        let schema = Schema::new(vec![string_field, binary_field]);
5234
5235        let raw_string_values = vec!["Blart Versenwald III"];
5236        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5237        let raw_binary_value_refs = raw_binary_values
5238            .iter()
5239            .map(|x| x.as_slice())
5240            .collect::<Vec<_>>();
5241
5242        let string_values = StringArray::from(raw_string_values.clone());
5243        let binary_values = BinaryArray::from(raw_binary_value_refs);
5244        let batch = RecordBatch::try_new(
5245            Arc::new(schema),
5246            vec![Arc::new(string_values), Arc::new(binary_values)],
5247        )
5248        .unwrap();
5249
5250        let props = WriterProperties::builder()
5251            .set_statistics_truncate_length(Some(2))
5252            .set_dictionary_enabled(false)
5253            .set_encoding(Encoding::PLAIN)
5254            .set_write_page_header_statistics(true)
5255            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5256            .build();
5257
5258        let file = roundtrip_opts(&batch, props);
5259
5260        // read file and decode page headers
5261        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5262
5263        // decode first page header
5264        let first_page = &file[4..];
5265        let mut prot = ThriftSliceInputProtocol::new(first_page);
5266        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5267        let stats = hdr.data_page_header.unwrap().statistics;
5268        assert!(stats.is_some());
5269        let stats = stats.unwrap();
5270        // check that min/max were properly truncated
5271        assert!(!stats.is_max_value_exact.unwrap());
5272        assert!(!stats.is_min_value_exact.unwrap());
5273        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5274        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5275
5276        // check second page now
5277        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5278        let mut prot = ThriftSliceInputProtocol::new(second_page);
5279        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5280        let stats = hdr.data_page_header.unwrap().statistics;
5281        assert!(stats.is_some());
5282        let stats = stats.unwrap();
5283        // check that min/max were properly truncated
5284        assert!(!stats.is_max_value_exact.unwrap());
5285        assert!(!stats.is_min_value_exact.unwrap());
5286        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5287        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5288    }
5289
5290    #[test]
5291    fn test_page_encoding_statistics_roundtrip() {
5292        let batch_schema = Schema::new(vec![Field::new(
5293            "int32",
5294            arrow_schema::DataType::Int32,
5295            false,
5296        )]);
5297
5298        let batch = RecordBatch::try_new(
5299            Arc::new(batch_schema.clone()),
5300            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5301        )
5302        .unwrap();
5303
5304        let mut file: File = tempfile::tempfile().unwrap();
5305        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5306        writer.write(&batch).unwrap();
5307        let file_metadata = writer.close().unwrap();
5308
5309        assert_eq!(file_metadata.num_row_groups(), 1);
5310        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5311        assert!(
5312            file_metadata
5313                .row_group(0)
5314                .column(0)
5315                .page_encoding_stats()
5316                .is_some()
5317        );
5318        let chunk_page_stats = file_metadata
5319            .row_group(0)
5320            .column(0)
5321            .page_encoding_stats()
5322            .unwrap();
5323
5324        // check that the read metadata is also correct
5325        let options = ReadOptionsBuilder::new()
5326            .with_page_index()
5327            .with_encoding_stats_as_mask(false)
5328            .build();
5329        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5330
5331        let rowgroup = reader.get_row_group(0).expect("row group missing");
5332        assert_eq!(rowgroup.num_columns(), 1);
5333        let column = rowgroup.metadata().column(0);
5334        assert!(column.page_encoding_stats().is_some());
5335        let file_page_stats = column.page_encoding_stats().unwrap();
5336        assert_eq!(chunk_page_stats, file_page_stats);
5337    }
5338
5339    #[test]
5340    fn test_different_dict_page_size_limit() {
5341        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5342        let schema = Arc::new(Schema::new(vec![
5343            Field::new("col0", arrow_schema::DataType::Int64, false),
5344            Field::new("col1", arrow_schema::DataType::Int64, false),
5345        ]));
5346        let batch =
5347            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5348
5349        let props = WriterProperties::builder()
5350            .set_dictionary_page_size_limit(1024 * 1024)
5351            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5352            .build();
5353        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5354        writer.write(&batch).unwrap();
5355        let data = Bytes::from(writer.into_inner().unwrap());
5356
5357        let mut metadata = ParquetMetaDataReader::new();
5358        metadata.try_parse(&data).unwrap();
5359        let metadata = metadata.finish().unwrap();
5360        let col0_meta = metadata.row_group(0).column(0);
5361        let col1_meta = metadata.row_group(0).column(1);
5362
5363        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5364            let mut reader =
5365                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5366            let page = reader.get_next_page().unwrap().unwrap();
5367            match page {
5368                Page::DictionaryPage { buf, .. } => buf.len(),
5369                _ => panic!("expected DictionaryPage"),
5370            }
5371        };
5372
5373        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5374        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5375    }
5376
5377    #[test]
5378    fn test_arrow_writer_granular_mode_roundtrip() {
5379        // Granular mode subdivides chunks and writes more pages than the
5380        // default batched path. Make sure the data we write back is
5381        // bit-identical to what went in — page-count assertions elsewhere
5382        // only prove pages were cut, not that the encoded data is correct.
5383        //
5384        // Mix value sizes so that the cumulative-byte-budget cutoff
5385        // lands mid-chunk, exercising both batched and granular paths
5386        // within the same `write_batch_internal` call.
5387        let small = "tiny".to_string();
5388        let big = "x".repeat(64 * 1024);
5389        let strings: Vec<String> = (0..256)
5390            .map(|i| {
5391                if i % 16 == 0 {
5392                    big.clone()
5393                } else {
5394                    small.clone()
5395                }
5396            })
5397            .collect();
5398
5399        let schema = Arc::new(Schema::new(vec![Field::new(
5400            "col",
5401            ArrowDataType::Utf8,
5402            false,
5403        )]));
5404        let batch = RecordBatch::try_new(
5405            schema.clone(),
5406            vec![Arc::new(StringArray::from(strings.clone())) as _],
5407        )
5408        .unwrap();
5409
5410        let props = WriterProperties::builder()
5411            .set_dictionary_enabled(false)
5412            .set_data_page_size_limit(16 * 1024)
5413            .build();
5414        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5415        writer.write(&batch).unwrap();
5416        let data = Bytes::from(writer.into_inner().unwrap());
5417
5418        let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5419        let read = reader.next().unwrap().unwrap();
5420        assert!(reader.next().is_none(), "expected one batch");
5421        let col = read
5422            .column(0)
5423            .as_any()
5424            .downcast_ref::<StringArray>()
5425            .unwrap();
5426        assert_eq!(col.len(), strings.len());
5427        for (i, expected) in strings.iter().enumerate() {
5428            assert_eq!(
5429                col.value(i),
5430                expected.as_str(),
5431                "value mismatch at index {i}"
5432            );
5433        }
5434    }
5435
5436    #[test]
5437    fn test_arrow_writer_all_null_string_column() {
5438        // The `LevelDataRef::value_count` Uniform branch with
5439        // `value != max_def` (entirely-null chunk) must return 0 so the
5440        // sub-batch sizer short-circuits to batch mode without trying
5441        // to estimate byte budgets for non-existent values.
5442        let num_rows = 1024;
5443        let schema = Arc::new(Schema::new(vec![Field::new(
5444            "col",
5445            ArrowDataType::Utf8,
5446            true,
5447        )]));
5448        let nulls: Vec<Option<&str>> = vec![None; num_rows];
5449        let batch = RecordBatch::try_new(
5450            schema.clone(),
5451            vec![Arc::new(StringArray::from(nulls)) as _],
5452        )
5453        .unwrap();
5454
5455        let props = WriterProperties::builder()
5456            .set_dictionary_enabled(false)
5457            .set_data_page_size_limit(16 * 1024)
5458            .build();
5459        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5460        writer.write(&batch).unwrap();
5461        let data = Bytes::from(writer.into_inner().unwrap());
5462
5463        // Re-parse the file: row group has one column, every row is
5464        // null, all data pages report `num_rows / page_count` rows.
5465        let mut metadata = ParquetMetaDataReader::new();
5466        metadata.try_parse(&data).unwrap();
5467        let metadata = metadata.finish().unwrap();
5468        let row_group = metadata.row_group(0);
5469        let col_meta = row_group.column(0);
5470        assert_eq!(row_group.num_rows() as usize, num_rows);
5471        // Statistics record `null_count = num_rows` — proves every value
5472        // was written as null.
5473        if let Some(stats) = col_meta.statistics() {
5474            assert_eq!(
5475                stats.null_count_opt().unwrap_or(0) as usize,
5476                num_rows,
5477                "expected all-null column to report null_count = num_rows"
5478            );
5479        }
5480
5481        let mut reader =
5482            SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5483        let mut total_values = 0u32;
5484        while let Some(page) = reader.get_next_page().unwrap() {
5485            if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5486                total_values += page.num_values();
5487            }
5488        }
5489        assert_eq!(
5490            total_values as usize, num_rows,
5491            "expected every level position to be represented in some page"
5492        );
5493    }
5494
5495    struct WriteBatchesShape {
5496        num_batches: usize,
5497        rows_per_batch: usize,
5498        row_size: usize,
5499    }
5500
5501    /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter`
5502    fn write_batches(
5503        WriteBatchesShape {
5504            num_batches,
5505            rows_per_batch,
5506            row_size,
5507        }: WriteBatchesShape,
5508        props: WriterProperties,
5509    ) -> ParquetRecordBatchReaderBuilder<File> {
5510        let schema = Arc::new(Schema::new(vec![Field::new(
5511            "str",
5512            ArrowDataType::Utf8,
5513            false,
5514        )]));
5515        let file = tempfile::tempfile().unwrap();
5516        let mut writer =
5517            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5518
5519        for batch_idx in 0..num_batches {
5520            let strings: Vec<String> = (0..rows_per_batch)
5521                .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5522                .collect();
5523            let array = StringArray::from(strings);
5524            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5525            writer.write(&batch).unwrap();
5526        }
5527        writer.close().unwrap();
5528        ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5529    }
5530
5531    #[test]
5532    // When both limits are None, all data should go into a single row group
5533    fn test_row_group_limit_none_writes_single_row_group() {
5534        let props = WriterProperties::builder()
5535            .set_max_row_group_row_count(None)
5536            .set_max_row_group_bytes(None)
5537            .build();
5538
5539        let builder = write_batches(
5540            WriteBatchesShape {
5541                num_batches: 1,
5542                rows_per_batch: 1000,
5543                row_size: 4,
5544            },
5545            props,
5546        );
5547
5548        assert_eq!(
5549            &row_group_sizes(builder.metadata()),
5550            &[1000],
5551            "With no limits, all rows should be in a single row group"
5552        );
5553    }
5554
5555    #[test]
5556    // When only max_row_group_size is set, respect the row limit
5557    fn test_row_group_limit_rows_only() {
5558        let props = WriterProperties::builder()
5559            .set_max_row_group_row_count(Some(300))
5560            .set_max_row_group_bytes(None)
5561            .build();
5562
5563        let builder = write_batches(
5564            WriteBatchesShape {
5565                num_batches: 1,
5566                rows_per_batch: 1000,
5567                row_size: 4,
5568            },
5569            props,
5570        );
5571
5572        assert_eq!(
5573            &row_group_sizes(builder.metadata()),
5574            &[300, 300, 300, 100],
5575            "Row groups should be split by row count"
5576        );
5577    }
5578
5579    #[test]
5580    // When only max_row_group_bytes is set, respect the byte limit
5581    fn test_row_group_limit_bytes_only() {
5582        let props = WriterProperties::builder()
5583            .set_max_row_group_row_count(None)
5584            // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
5585            .set_max_row_group_bytes(Some(3500))
5586            .build();
5587
5588        let builder = write_batches(
5589            WriteBatchesShape {
5590                num_batches: 10,
5591                rows_per_batch: 10,
5592                row_size: 100,
5593            },
5594            props,
5595        );
5596
5597        let sizes = row_group_sizes(builder.metadata());
5598
5599        assert!(
5600            sizes.len() > 1,
5601            "Should have multiple row groups due to byte limit, got {sizes:?}",
5602        );
5603
5604        let total_rows: i64 = sizes.iter().sum();
5605        assert_eq!(total_rows, 100, "Total rows should be preserved");
5606    }
5607
5608    #[test]
5609    // If an in-progress row group is already oversized, it should be flushed before writing more.
5610    fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5611        let schema = Arc::new(Schema::new(vec![Field::new(
5612            "str",
5613            ArrowDataType::Utf8,
5614            false,
5615        )]));
5616        let file = tempfile::tempfile().unwrap();
5617
5618        // Start with no byte limit so we can intentionally build an oversized in-progress row group.
5619        let props = WriterProperties::builder()
5620            .set_max_row_group_row_count(None)
5621            .set_max_row_group_bytes(None)
5622            .build();
5623        let mut writer =
5624            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5625
5626        let first_array = StringArray::from(
5627            (0..10)
5628                .map(|i| format!("{:0>100}", i))
5629                .collect::<Vec<String>>(),
5630        );
5631        let first_batch =
5632            RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5633        writer.write(&first_batch).unwrap();
5634        assert_eq!(writer.in_progress_rows(), 10);
5635
5636        // Tighten the limit below the current in-progress bytes to exercise:
5637        // `if current_bytes >= max_bytes { self.flush()?; ... }`
5638        writer.max_row_group_bytes = Some(1);
5639
5640        let second_array = StringArray::from(vec!["x".to_string()]);
5641        let second_batch =
5642            RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5643        writer.write(&second_batch).unwrap();
5644        writer.close().unwrap();
5645        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5646
5647        assert_eq!(
5648            &row_group_sizes(builder.metadata()),
5649            &[10, 1],
5650            "The second write should flush an oversized in-progress row group first",
5651        );
5652    }
5653
5654    #[test]
5655    // When both limits are set, the row limit triggers first
5656    fn test_row_group_limit_both_row_wins_single_batch() {
5657        let props = WriterProperties::builder()
5658            .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows
5659            .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data
5660            .build();
5661
5662        let builder = write_batches(
5663            WriteBatchesShape {
5664                num_batches: 1,
5665                row_size: 4,
5666                rows_per_batch: 1000,
5667            },
5668            props,
5669        );
5670
5671        assert_eq!(
5672            &row_group_sizes(builder.metadata()),
5673            &[200, 200, 200, 200, 200],
5674            "Row limit should trigger before byte limit"
5675        );
5676    }
5677
5678    #[test]
5679    // When both limits are set, the row limit triggers first
5680    fn test_row_group_limit_both_row_wins_multiple_batches() {
5681        let props = WriterProperties::builder()
5682            .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows
5683            .set_max_row_group_bytes(Some(9999)) // Won't trigger
5684            .build();
5685
5686        let builder = write_batches(
5687            WriteBatchesShape {
5688                num_batches: 10,
5689                rows_per_batch: 10,
5690                row_size: 100,
5691            },
5692            props,
5693        );
5694
5695        assert_eq!(
5696            &row_group_sizes(builder.metadata()),
5697            &[5; 20],
5698            "Row limit should trigger before byte limit"
5699        );
5700    }
5701
5702    #[test]
5703    // When both limits are set, the byte limit triggers first
5704    fn test_row_group_limit_both_bytes_wins() {
5705        let props = WriterProperties::builder()
5706            .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows
5707            .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows
5708            .build();
5709
5710        let builder = write_batches(
5711            WriteBatchesShape {
5712                num_batches: 10,
5713                rows_per_batch: 10,
5714                row_size: 100,
5715            },
5716            props,
5717        );
5718
5719        let sizes = row_group_sizes(builder.metadata());
5720
5721        assert!(
5722            sizes.len() > 1,
5723            "Byte limit should trigger before row limit, got {sizes:?}",
5724        );
5725
5726        assert!(
5727            sizes.iter().all(|&s| s < 1000),
5728            "No row group should hit the row limit"
5729        );
5730
5731        let total_rows: i64 = sizes.iter().sum();
5732        assert_eq!(total_rows, 100, "Total rows should be preserved");
5733    }
5734
5735    #[test]
5736    fn arrow_column_chunk_close_mut_drops_column_index() {
5737        use crate::arrow::ArrowSchemaConverter;
5738        use crate::file::writer::SerializedFileWriter;
5739
5740        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5741        let props = Arc::new(
5742            WriterProperties::builder()
5743                .set_statistics_enabled(EnabledStatistics::Page)
5744                .build(),
5745        );
5746        let parquet_schema = ArrowSchemaConverter::new()
5747            .with_coerce_types(props.coerce_types())
5748            .convert(&schema)
5749            .unwrap();
5750
5751        let mut buf = Vec::with_capacity(1024);
5752        let mut writer =
5753            SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5754                .unwrap();
5755
5756        let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5757        let mut col_writers = factory.create_column_writers(0).unwrap();
5758        let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5759        for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5760            col_writers[0].write(&leaves).unwrap();
5761        }
5762        let mut chunk = col_writers.pop().unwrap().close().unwrap();
5763
5764        // Immutable accessor exposes the close result produced at close time.
5765        assert!(
5766            chunk.close().column_index.is_some(),
5767            "EnabledStatistics::Page should produce a column_index"
5768        );
5769
5770        // Mutable accessor lets callers drop the page-level index before append.
5771        chunk.close_mut().column_index = None;
5772        assert!(chunk.close().column_index.is_none());
5773
5774        let mut rg = writer.next_row_group().unwrap();
5775        chunk.append_to_row_group(&mut rg).unwrap();
5776        rg.close().unwrap();
5777        let file_meta = writer.close().unwrap();
5778
5779        // After dropping column_index, the resulting file records no column
5780        // index offset/length for this chunk.
5781        let cc = file_meta.row_group(0).column(0);
5782        assert!(cc.column_index_range().is_none());
5783    }
5784
5785    /// Writes a single-column RecordBatch to an in-memory Parquet buffer.
5786    fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5787        let schema = Arc::new(Schema::new(vec![Field::new(
5788            "col",
5789            array.data_type().clone(),
5790            true,
5791        )]));
5792        let buf = get_bytes_after_close(
5793            schema.clone(),
5794            &RecordBatch::try_new(schema, vec![array]).unwrap(),
5795        );
5796        Bytes::from(buf)
5797    }
5798
5799    /// Reads column 0 from a single-row-group Parquet buffer, projecting it with the given schema.
5800    /// Passing a flat schema when the buffer was written from a REE array lets callers decode
5801    /// the physical values without the run-end encoding wrapper.
5802    fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5803        let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5804        ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5805            .unwrap()
5806            .build()
5807            .unwrap()
5808            .next()
5809            .unwrap()
5810            .unwrap()
5811            .column(0)
5812            .clone()
5813    }
5814
5815    fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5816        let flat_schema = Arc::new(Schema::new(vec![Field::new(
5817            "col",
5818            flat.data_type().clone(),
5819            true,
5820        )]));
5821        let ree_bytes = write_column_to_bytes(ree);
5822        let flat_bytes = write_column_to_bytes(flat.clone());
5823        assert_eq!(
5824            ree_bytes, flat_bytes,
5825            "REE and flat bytes should be identical"
5826        );
5827
5828        let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5829        let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5830
5831        assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5832        assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5833    }
5834
5835    #[test]
5836    fn ree_string() {
5837        let ree: ArrayRef = Arc::new(
5838            [Some("a"), Some("a"), None, Some("b"), Some("b")]
5839                .into_iter()
5840                .collect::<Int32RunArray>(),
5841        );
5842        let flat: ArrayRef = Arc::new(StringArray::from(vec![
5843            Some("a"),
5844            Some("a"),
5845            None,
5846            Some("b"),
5847            Some("b"),
5848        ]));
5849        ree_write_read_roundtrip(ree, flat);
5850    }
5851
5852    #[test]
5853    fn ree_int32() {
5854        let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5855        for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5856            b.append_option(v);
5857        }
5858        let ree: ArrayRef = Arc::new(b.finish());
5859        let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5860            Some(1),
5861            Some(1),
5862            None,
5863            Some(2),
5864            Some(2),
5865        ]));
5866        ree_write_read_roundtrip(ree, flat);
5867    }
5868
5869    #[test]
5870    fn ree_bool() {
5871        // run_ends [3, 5, 7] → [T,T,T, null,null, F,F]
5872        let ree: ArrayRef = Arc::new(
5873            RunArray::try_new(
5874                &Int32Array::from(vec![3, 5, 7]),
5875                &BooleanArray::from(vec![Some(true), None, Some(false)]),
5876            )
5877            .unwrap(),
5878        );
5879        let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5880            Some(true),
5881            Some(true),
5882            Some(true),
5883            None,
5884            None,
5885            Some(false),
5886            Some(false),
5887        ]));
5888        ree_write_read_roundtrip(ree, flat);
5889    }
5890
5891    #[test]
5892    fn ree_fixed_size_binary() {
5893        let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5894            let mut b = FixedSizeBinaryBuilder::new(2);
5895            for v in vals {
5896                match v {
5897                    Some(x) => b.append_value(x).unwrap(),
5898                    None => b.append_null(),
5899                }
5900            }
5901            b.finish()
5902        };
5903        // run_ends [2, 4, 6] → [aa,aa, null,null, bb,bb]
5904        let ree: ArrayRef = Arc::new(
5905            RunArray::try_new(
5906                &Int32Array::from(vec![2, 4, 6]),
5907                &mk(&[Some(b"aa"), None, Some(b"bb")]),
5908            )
5909            .unwrap(),
5910        );
5911        let flat: ArrayRef = Arc::new(mk(&[
5912            Some(b"aa"),
5913            Some(b"aa"),
5914            None,
5915            None,
5916            Some(b"bb"),
5917            Some(b"bb"),
5918        ]));
5919        ree_write_read_roundtrip(ree, flat);
5920    }
5921
5922    #[test]
5923    fn ree_single_run() {
5924        let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5925        let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5926        ree_write_read_roundtrip(ree, flat);
5927    }
5928
5929    #[test]
5930    fn ree_float32() {
5931        // run_ends [2, 4, 5] → [1.0, 1.0, null, null, 2.5]
5932        let ree: ArrayRef = Arc::new(
5933            RunArray::try_new(
5934                &Int32Array::from(vec![2, 4, 5]),
5935                &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5936            )
5937            .unwrap(),
5938        );
5939        let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5940            Some(1.0_f32),
5941            Some(1.0_f32),
5942            None,
5943            None,
5944            Some(2.5_f32),
5945        ]));
5946        ree_write_read_roundtrip(ree, flat);
5947    }
5948
5949    #[test]
5950    fn ree_sliced() {
5951        // A sliced (non-zero offset) REE array: verify that get_physical_index
5952        // correctly accounts for the logical offset when expanding.
5953        // Full array: run_ends [3, 5, 7] → [a,a,a, b,b, c,c]
5954        // After slice(2, 5) the logical view is [a, b, b, c, c].
5955        let full: ArrayRef = Arc::new(
5956            RunArray::try_new(
5957                &Int32Array::from(vec![3, 5, 7]),
5958                &StringArray::from(vec!["a", "b", "c"]),
5959            )
5960            .unwrap(),
5961        );
5962        let sliced = full.slice(2, 5);
5963        let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5964        ree_write_read_roundtrip(sliced, flat);
5965    }
5966
5967    #[test]
5968    fn ree_struct_with_ree_child() {
5969        // Struct with a REE string field and a REE int field — confirms
5970        // recursion visits every child and each collapses to the right leaf type.
5971        let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5972
5973        let col_a: ArrayRef = Arc::new(
5974            RunArray::try_new(
5975                &run_ends,
5976                &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5977            )
5978            .unwrap(),
5979        );
5980        let col_b: ArrayRef = Arc::new(
5981            RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5982        );
5983
5984        let struct_array: ArrayRef = Arc::new(StructArray::new(
5985            Fields::from(vec![
5986                Field::new("a", col_a.data_type().clone(), true),
5987                Field::new("b", col_b.data_type().clone(), true),
5988            ]),
5989            vec![col_a, col_b],
5990            None,
5991        ));
5992
5993        let schema = Arc::new(Schema::new(vec![Field::new(
5994            "row",
5995            struct_array.data_type().clone(),
5996            true,
5997        )]));
5998        let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5999
6000        let mut buf = Vec::new();
6001        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
6002        writer.write(&batch).unwrap();
6003        let metadata = writer.close().unwrap();
6004
6005        let parquet_schema = metadata.file_metadata().schema_descr();
6006        assert_eq!(parquet_schema.num_columns(), 2);
6007        assert_eq!(
6008            parquet_schema.column(0).physical_type(),
6009            crate::basic::Type::BYTE_ARRAY
6010        );
6011        assert_eq!(parquet_schema.column(0).path().string(), "row.a");
6012        assert_eq!(
6013            parquet_schema.column(1).physical_type(),
6014            crate::basic::Type::INT32
6015        );
6016        assert_eq!(parquet_schema.column(1).path().string(), "row.b");
6017    }
6018}