Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62    InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63    PageStoreFactory,
64};
65
66/// Encodes [`RecordBatch`] to parquet
67///
68/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
69/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
70/// flushed on close, leading the final row group in the output file to potentially
71/// contain fewer than `max_row_group_size` rows
72///
73/// # Example: Writing `RecordBatch`es
74/// ```
75/// # use std::sync::Arc;
76/// # use bytes::Bytes;
77/// # use arrow_array::{ArrayRef, Int64Array};
78/// # use arrow_array::RecordBatch;
79/// # use parquet::arrow::arrow_writer::ArrowWriter;
80/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
81/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
82/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
83///
84/// let mut buffer = Vec::new();
85/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
86/// writer.write(&to_write).unwrap();
87/// writer.close().unwrap();
88///
89/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
90/// let read = reader.next().unwrap().unwrap();
91///
92/// assert_eq!(to_write, read);
93/// ```
94///
95/// # Memory Usage and Limiting
96///
97/// The nature of Parquet requires buffering of an entire row group before it can
98/// be flushed to the underlying writer. Data is mostly buffered in its encoded
99/// form, reducing memory usage. However, some data such as dictionary keys,
100/// large strings or very nested data may still result in non-trivial memory
101/// usage.
102///
103/// See Also:
104/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
105/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
106///
107/// Call [`Self::flush`] to trigger an early flush of a row group based on a
108/// memory threshold and/or global memory pressure. However,  smaller row groups
109/// result in higher metadata overheads, and thus may worsen compression ratios
110/// and query performance.
111///
112/// ```no_run
113/// # use std::io::Write;
114/// # use arrow_array::RecordBatch;
115/// # use parquet::arrow::ArrowWriter;
116/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
117/// # let batch: RecordBatch = todo!();
118/// writer.write(&batch).unwrap();
119/// // Trigger an early flush if anticipated size exceeds 1_000_000
120/// if writer.in_progress_size() > 1_000_000 {
121///     writer.flush().unwrap();
122/// }
123/// ```
124///
125/// ## Type Support
126///
127/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
128/// Parquet types including  [`StructArray`] and [`ListArray`].
129///
130/// The following are not supported:
131///
132/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
133///
134/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
135/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
136/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
137/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
138/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
139///
140/// ## Type Compatibility
141/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
142/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
143/// value type.
144///
145/// For example, the following [`DataType`]s are all logically equivalent and can be written
146/// to the same column:
147/// * String, LargeString, StringView
148/// * Binary, LargeBinary, BinaryView
149///
150/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
151/// contain compatible values.
152/// ```
153/// # use std::sync::Arc;
154/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
155/// # use arrow_schema::{DataType, Field, Schema};
156/// # use parquet::arrow::arrow_writer::ArrowWriter;
157/// let record_batch1 = RecordBatch::try_new(
158///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
159///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
160///  )
161/// .unwrap();
162///
163/// let mut buffer = Vec::new();
164/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
165/// writer.write(&record_batch1).unwrap();
166///
167/// let record_batch2 = RecordBatch::try_new(
168///     Arc::new(Schema::new(vec![Field::new(
169///         "col",
170///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
171///          false,
172///     )])),
173///     vec![Arc::new(DictionaryArray::new(
174///          UInt8Array::from_iter_values(vec![0, 1]),
175///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
176///      ))],
177///  )
178///  .unwrap();
179///  writer.write(&record_batch2).unwrap();
180///  writer.close();
181/// ```
182pub struct ArrowWriter<W: Write> {
183    /// Underlying Parquet writer
184    writer: SerializedFileWriter<W>,
185
186    /// The in-progress row group if any
187    in_progress: Option<ArrowRowGroupWriter>,
188
189    /// A copy of the Arrow schema.
190    ///
191    /// The schema is used to verify that each record batch written has the correct schema
192    arrow_schema: SchemaRef,
193
194    /// Creates new [`ArrowRowGroupWriter`] instances as required
195    row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197    /// The maximum number of rows to write to each row group, or None for unlimited
198    max_row_group_row_count: Option<usize>,
199
200    /// The maximum size in bytes for a row group, or None for unlimited
201    max_row_group_bytes: Option<usize>,
202
203    /// CDC chunkers persisted across row groups (one per leaf column).
204    cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        let buffered_memory = self.in_progress_size();
210        f.debug_struct("ArrowWriter")
211            .field("writer", &self.writer)
212            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213            .field("in_progress_rows", &self.in_progress_rows())
214            .field("arrow_schema", &self.arrow_schema)
215            .field("max_row_group_row_count", &self.max_row_group_row_count)
216            .field("max_row_group_bytes", &self.max_row_group_bytes)
217            .finish()
218    }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222    /// Try to create a new Arrow writer
223    ///
224    /// The writer will fail if:
225    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
226    ///  * the Arrow schema contains unsupported datatypes such as Unions
227    pub fn try_new(
228        writer: W,
229        arrow_schema: SchemaRef,
230        props: Option<WriterProperties>,
231    ) -> Result<Self> {
232        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233        Self::try_new_with_options(writer, arrow_schema, options)
234    }
235
236    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
237    ///
238    /// The writer will fail if:
239    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
240    ///  * the Arrow schema contains unsupported datatypes such as Unions
241    pub fn try_new_with_options(
242        writer: W,
243        arrow_schema: SchemaRef,
244        options: ArrowWriterOptions,
245    ) -> Result<Self> {
246        let mut props = options.properties;
247
248        let schema = if let Some(parquet_schema) = options.schema_descr {
249            parquet_schema.clone()
250        } else {
251            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252            if let Some(schema_root) = &options.schema_root {
253                converter = converter.schema_root(schema_root);
254            }
255
256            converter.convert(&arrow_schema)?
257        };
258
259        if !options.skip_arrow_metadata {
260            // add serialized arrow schema
261            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262        }
263
264        let max_row_group_row_count = props.max_row_group_row_count();
265        let max_row_group_bytes = props.max_row_group_bytes();
266
267        let props_ptr = Arc::new(props);
268        let file_writer =
269            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271        let mut row_group_writer_factory =
272            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273        if let Some(page_store_factory) = options.page_store_factory {
274            row_group_writer_factory =
275                row_group_writer_factory.with_page_store_factory(page_store_factory);
276        }
277
278        let cdc_chunkers = props_ptr
279            .content_defined_chunking()
280            .map(|opts| {
281                file_writer
282                    .schema_descr()
283                    .columns()
284                    .iter()
285                    .map(|desc| ContentDefinedChunker::new(desc, opts))
286                    .collect::<Result<Vec<_>>>()
287            })
288            .transpose()?;
289
290        Ok(Self {
291            writer: file_writer,
292            in_progress: None,
293            arrow_schema,
294            row_group_writer_factory,
295            max_row_group_row_count,
296            max_row_group_bytes,
297            cdc_chunkers,
298        })
299    }
300
301    /// Returns metadata for any flushed row groups
302    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303        self.writer.flushed_row_groups()
304    }
305
306    /// Estimated memory usage, in bytes, of this `ArrowWriter`
307    ///
308    /// This estimate is formed bu summing the values of
309    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
310    pub fn memory_size(&self) -> usize {
311        match &self.in_progress {
312            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313            None => 0,
314        }
315    }
316
317    /// Anticipated encoded size of the in progress row group.
318    ///
319    /// This estimate the row group size after being completely encoded is,
320    /// formed by summing the values of
321    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
322    /// columns.
323    pub fn in_progress_size(&self) -> usize {
324        match &self.in_progress {
325            Some(in_progress) => in_progress
326                .writers
327                .iter()
328                .map(|x| x.get_estimated_total_bytes())
329                .sum(),
330            None => 0,
331        }
332    }
333
334    /// Returns the number of rows buffered in the in progress row group
335    pub fn in_progress_rows(&self) -> usize {
336        self.in_progress
337            .as_ref()
338            .map(|x| x.buffered_rows)
339            .unwrap_or_default()
340    }
341
342    /// Returns the number of bytes written by this instance
343    pub fn bytes_written(&self) -> usize {
344        self.writer.bytes_written()
345    }
346
347    /// Encodes the provided [`RecordBatch`]
348    ///
349    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]
350    /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be
351    /// written to one or more row groups such that limits are respected.
352    ///
353    /// If both limits are `None`, all data is written to a single row group.
354    /// If one limit is set, that limit is respected.
355    /// If both limits are set, the lower bound (whichever triggers first) is respected.
356    ///
357    /// This will fail if the `batch`'s schema does not match the writer's schema.
358    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359        if batch.num_rows() == 0 {
360            return Ok(());
361        }
362
363        let in_progress = match &mut self.in_progress {
364            Some(in_progress) => in_progress,
365            x => x.insert(
366                self.row_group_writer_factory
367                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368            ),
369        };
370
371        if let Some(max_rows) = self.max_row_group_row_count {
372            if in_progress.buffered_rows + batch.num_rows() > max_rows {
373                let to_write = max_rows - in_progress.buffered_rows;
374                let a = batch.slice(0, to_write);
375                let b = batch.slice(to_write, batch.num_rows() - to_write);
376                self.write(&a)?;
377                return self.write(&b);
378            }
379        }
380
381        // Check byte limit: if we have buffered data, use measured average row size
382        // to split batch proactively before exceeding byte limit
383        if let Some(max_bytes) = self.max_row_group_bytes {
384            if in_progress.buffered_rows > 0 {
385                let current_bytes = in_progress.get_estimated_total_bytes();
386
387                if current_bytes >= max_bytes {
388                    self.flush()?;
389                    return self.write(batch);
390                }
391
392                let avg_row_bytes = current_bytes / in_progress.buffered_rows;
393                if avg_row_bytes > 0 {
394                    // At this point, `current_bytes < max_bytes` (checked above)
395                    let remaining_bytes = max_bytes - current_bytes;
396                    let rows_that_fit = remaining_bytes / avg_row_bytes;
397
398                    if batch.num_rows() > rows_that_fit {
399                        if rows_that_fit > 0 {
400                            let a = batch.slice(0, rows_that_fit);
401                            let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
402                            self.write(&a)?;
403                            return self.write(&b);
404                        } else {
405                            self.flush()?;
406                            return self.write(batch);
407                        }
408                    }
409                }
410            }
411        }
412
413        match self.cdc_chunkers.as_mut() {
414            Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
415            None => in_progress.write(batch)?,
416        }
417
418        let should_flush = self
419            .max_row_group_row_count
420            .is_some_and(|max| in_progress.buffered_rows >= max)
421            || self
422                .max_row_group_bytes
423                .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
424
425        if should_flush {
426            self.flush()?
427        }
428        Ok(())
429    }
430
431    /// Writes the given buf bytes to the internal buffer.
432    ///
433    /// It's safe to use this method to write data to the underlying writer,
434    /// because it will ensure that the buffering and byte‐counting layers are used.
435    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
436        self.writer.write_all(buf)
437    }
438
439    /// Flushes underlying writer
440    pub fn sync(&mut self) -> std::io::Result<()> {
441        self.writer.flush()
442    }
443
444    /// Flushes all buffered rows into a new row group
445    ///
446    /// Note the underlying writer is not flushed with this call.
447    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
448    pub fn flush(&mut self) -> Result<()> {
449        let in_progress = match self.in_progress.take() {
450            Some(in_progress) => in_progress,
451            None => return Ok(()),
452        };
453
454        let mut row_group_writer = self.writer.next_row_group()?;
455        for chunk in in_progress.close()? {
456            chunk.append_to_row_group(&mut row_group_writer)?;
457        }
458        row_group_writer.close()?;
459        Ok(())
460    }
461
462    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
463    ///
464    /// This method provide a way to append kv_metadata after write RecordBatch
465    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
466        self.writer.append_key_value_metadata(kv_metadata)
467    }
468
469    /// Returns a reference to the underlying writer.
470    pub fn inner(&self) -> &W {
471        self.writer.inner()
472    }
473
474    /// Returns a mutable reference to the underlying writer.
475    ///
476    /// **Warning**: if you write directly to this writer, you will skip
477    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
478    /// the file footer’s recorded offsets and sizes to diverge from reality,
479    /// resulting in an unreadable or corrupted Parquet file.
480    ///
481    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
482    pub fn inner_mut(&mut self) -> &mut W {
483        self.writer.inner_mut()
484    }
485
486    /// Flushes any outstanding data and returns the underlying writer.
487    pub fn into_inner(mut self) -> Result<W> {
488        self.flush()?;
489        self.writer.into_inner()
490    }
491
492    /// Close and finalize the underlying Parquet writer
493    ///
494    /// Unlike [`Self::close`] this does not consume self
495    ///
496    /// Attempting to write after calling finish will result in an error
497    pub fn finish(&mut self) -> Result<ParquetMetaData> {
498        self.flush()?;
499        self.writer.finish()
500    }
501
502    /// Close and finalize the underlying Parquet writer
503    pub fn close(mut self) -> Result<ParquetMetaData> {
504        self.finish()
505    }
506
507    /// Create a new row group writer and return its column writers.
508    #[deprecated(
509        since = "56.2.0",
510        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
511    )]
512    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
513        self.flush()?;
514        let in_progress = self
515            .row_group_writer_factory
516            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
517        Ok(in_progress.writers)
518    }
519
520    /// Append the given column chunks to the file as a new row group.
521    #[deprecated(
522        since = "56.2.0",
523        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
524    )]
525    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
526        let mut row_group_writer = self.writer.next_row_group()?;
527        for chunk in chunks {
528            chunk.append_to_row_group(&mut row_group_writer)?;
529        }
530        row_group_writer.close()?;
531        Ok(())
532    }
533
534    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
535    ///
536    /// Flushes any outstanding data before returning.
537    ///
538    /// This can be useful to provide more control over how files are written, for example
539    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
540    pub fn into_serialized_writer(
541        mut self,
542    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
543        self.flush()?;
544        Ok((self.writer, self.row_group_writer_factory))
545    }
546}
547
548impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
549    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
550        self.write(batch).map_err(|e| e.into())
551    }
552
553    fn close(self) -> std::result::Result<(), ArrowError> {
554        self.close()?;
555        Ok(())
556    }
557}
558
559/// Arrow-specific configuration settings for writing parquet files.
560///
561/// See [`ArrowWriter`] for how to configure the writer.
562#[derive(Debug, Clone, Default)]
563pub struct ArrowWriterOptions {
564    properties: WriterProperties,
565    skip_arrow_metadata: bool,
566    schema_root: Option<String>,
567    schema_descr: Option<SchemaDescriptor>,
568    page_store_factory: Option<Arc<dyn PageStoreFactory>>,
569}
570
571impl ArrowWriterOptions {
572    /// Creates a new [`ArrowWriterOptions`] with the default settings.
573    pub fn new() -> Self {
574        Self::default()
575    }
576
577    /// Sets the [`WriterProperties`] for writing parquet files.
578    pub fn with_properties(self, properties: WriterProperties) -> Self {
579        Self { properties, ..self }
580    }
581
582    /// Sets the [`PageStoreFactory`] used to buffer completed pages while a row
583    /// group is being written.
584    ///
585    /// The default implementation ([`InMemoryPageStore`]) buffers all completed
586    /// pages on the heap until the row group is flushed, so peak write memory
587    /// grows with the row group size. Using this API, pages can be spilled to a
588    /// file or object storage instead, reducing peak write memory substantially
589    /// at the expense of an extra write to and read from secondary storage.
590    ///
591    /// # Example: spilling pages to a temp file
592    ///
593    /// A simple spilling backend uses one temp file per column chunk; `put`
594    /// appends the page and `take` reads it back.
595    ///
596    /// ```
597    /// # use std::fs::File;
598    /// # use std::io::{Read, Seek, SeekFrom, Write};
599    /// # use std::sync::Arc;
600    /// # use bytes::Bytes;
601    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
602    /// # use parquet::arrow::arrow_writer::{
603    /// #     ArrowWriter, ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory,
604    /// # };
605    /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
606    /// # use parquet::errors::Result;
607    /// struct TempFilePageStore {
608    ///     file: File,
609    ///     /// Total size of the file
610    ///     end: u64,
611    ///     /// Location of pages: (offset, len)
612    ///     locs: Vec<(u64, usize)>,
613    /// }
614    ///
615    /// impl PageStore for TempFilePageStore {
616    ///     fn put(&mut self, value: Bytes) -> Result<PageKey> {
617    ///         // Append to the end of the file
618    ///         self.file.seek(SeekFrom::Start(self.end))?;
619    ///         self.file.write_all(&value)?;
620    ///         let key = PageKey::new(self.locs.len() as u64);
621    ///         self.locs.push((self.end, value.len()));
622    ///         self.end += value.len() as u64;
623    ///         Ok(key)
624    ///     }
625    ///
626    ///     fn take(&mut self, key: PageKey) -> Result<Bytes> {
627    ///         let (offset, len) = self.locs[key.get() as usize];
628    ///         let mut buf = vec![0u8; len];
629    ///         self.file.seek(SeekFrom::Start(offset))?;
630    ///         self.file.read_exact(&mut buf)?;
631    ///         Ok(Bytes::from(buf))
632    ///     }
633    /// }
634    ///
635    /// /// Factory for creating [`TempFilePageStore`]
636    /// #[derive(Debug)]
637    /// struct TempFilePageStoreFactory;
638    ///
639    /// impl PageStoreFactory for TempFilePageStoreFactory {
640    ///     fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
641    ///         // `args` exposes the column index and descriptor (physical/logical
642    ///         // type, path), so a real backend might choose to spill only large columns.
643    ///         let _ = (args.column_index(), args.column_descriptor());
644    ///         Ok(Box::new(TempFilePageStore {
645    ///             file: tempfile::tempfile()?, // temp file is cleaned on drop
646    ///             end: 0,
647    ///             locs: Vec::new(),
648    ///         }))
649    ///     }
650    /// }
651    /// // write 1000 integers
652    /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef;
653    /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
654    ///
655    /// let options =
656    ///     ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory));
657    /// let mut buffer = Vec::new();
658    /// let mut writer =
659    ///     ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), options).unwrap();
660    /// writer.write(&to_write).unwrap();
661    /// writer.close().unwrap();
662    ///
663    /// // buffer now holds valid Parquet data, which can be read as normal:
664    /// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
665    /// assert_eq!(to_write, reader.next().unwrap().unwrap());
666    /// ```
667    pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
668        Self {
669            page_store_factory: Some(page_store_factory),
670            ..self
671        }
672    }
673
674    /// Skip encoding the embedded arrow metadata (defaults to `false`)
675    ///
676    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
677    /// by default.
678    ///
679    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
680    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
681        Self {
682            skip_arrow_metadata,
683            ..self
684        }
685    }
686
687    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
688    pub fn with_schema_root(self, schema_root: String) -> Self {
689        Self {
690            schema_root: Some(schema_root),
691            ..self
692        }
693    }
694
695    /// Explicitly specify the Parquet schema to be used
696    ///
697    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
698    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
699    /// already known or must be calculated using custom logic.
700    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
701        Self {
702            schema_descr: Some(schema_descr),
703            ..self
704        }
705    }
706}
707
708/// A single column chunk produced by [`ArrowColumnWriter`].
709///
710/// Holds the serialized page blobs (each page's header ‖ compressed data, in
711/// write order) in a [`PageStore`], plus the handles needed to read them back,
712/// in order, when the chunk is spliced into the output file.
713struct ArrowColumnChunkData {
714    length: usize,
715    store: Box<dyn PageStore>,
716    keys: Vec<PageKey>,
717    /// Handles to the dictionary page's blobs (header then data) in the store.
718    ///
719    /// A dictionary page is produced at most once and bounded by
720    /// `dict_page_size_limit`, but it must be written *first* in the chunk even
721    /// though the data pages reach the writer before it (see
722    /// [`PageWriter::defers_dictionary_ordering`]). Its header and data are `put`
723    /// into the store like any other page — which keeps the store uniform, and
724    /// lets an oversized dictionary page spill — and their handles are held apart
725    /// so they can be emitted ahead of the data pages at splice.
726    /// Empty for non-dictionary columns.
727    dictionary_keys: Vec<PageKey>,
728    /// Serialized length of the dictionary page (0 if there is none), recorded
729    /// so the data pages can be shifted past it when offsets are rewritten to a
730    /// dictionary-first layout at splice.
731    dictionary_len: usize,
732}
733
734impl ArrowColumnChunkData {
735    fn new(store: Box<dyn PageStore>) -> Self {
736        Self {
737            length: 0,
738            store,
739            keys: Vec::new(),
740            dictionary_keys: Vec::new(),
741            dictionary_len: 0,
742        }
743    }
744
745    /// Append a data-page blob to the store, recording its handle in write
746    /// order.
747    fn push(&mut self, value: Bytes) -> Result<()> {
748        let key = self.store.put(value)?;
749        self.keys.push(key);
750        Ok(())
751    }
752
753    /// Store a dictionary-page blob (header or data) in the page store,
754    /// recording its handle (emitted first at splice) and accumulating its
755    /// serialized length.
756    fn push_dictionary(&mut self, value: Bytes) -> Result<()> {
757        self.dictionary_len += value.len();
758        let key = self.store.put(value)?;
759        self.dictionary_keys.push(key);
760        Ok(())
761    }
762
763    /// Bytes this chunk currently holds on the heap: whatever the store keeps
764    /// resident (zero for a spilling backend).
765    fn memory_size(&self) -> usize {
766        self.store.memory_size()
767    }
768}
769
770/// A streaming [`Read`] over one column chunk's buffered pages, in final file
771/// order: the dictionary page (if any) first, then the data pages.
772///
773/// Each blob is taken back out of the [`PageStore`] *as it is
774/// consumed* and released immediately afterwards, so splicing a chunk into the
775/// output file never materializes more than a single page in memory at a time.
776/// This is what keeps the splice phase within the memory bound for a spilling
777/// backend (an in-memory store already holds the bytes, so it is unaffected).
778struct StreamingColumnChunkReader {
779    store: Box<dyn PageStore>,
780    /// Page handles in final file order: the dictionary page first (if any),
781    /// then the data pages.
782    keys: IntoIter<PageKey>,
783    /// The blob currently being drained into the output; emptied as it is read.
784    current: Bytes,
785}
786
787impl StreamingColumnChunkReader {
788    fn new(data: ArrowColumnChunkData) -> Self {
789        // The dictionary page must be emitted first, ahead of the data pages,
790        // even though it was the last page produced.
791        let keys = if data.dictionary_keys.is_empty() {
792            data.keys
793        } else {
794            let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len());
795            keys.extend(data.dictionary_keys);
796            keys.extend(data.keys);
797            keys
798        };
799        Self {
800            store: data.store,
801            keys: keys.into_iter(),
802            current: Bytes::new(),
803        }
804    }
805}
806
807impl Read for StreamingColumnChunkReader {
808    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
809        // Refill from the next blob whenever the current one is drained: the
810        // dictionary page first, then each data page, all taken from the store.
811        while self.current.is_empty() {
812            if let Some(key) = self.keys.next() {
813                self.current = self.store.take(key).map_err(std::io::Error::other)?;
814            } else {
815                return Ok(0);
816            }
817        }
818
819        let len = self.current.len().min(out.len());
820        let b = self.current.split_to(len);
821        out[..len].copy_from_slice(&b);
822        Ok(len)
823    }
824}
825
826/// A shared [`ArrowColumnChunkData`]
827///
828/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
829/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
830type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
831
832struct ArrowPageWriter {
833    buffer: SharedColumnChunk,
834    #[cfg(feature = "encryption")]
835    page_encryptor: Option<PageEncryptor>,
836}
837
838impl ArrowPageWriter {
839    /// Create a page writer that buffers completed pages in `store`.
840    fn new(store: Box<dyn PageStore>) -> Self {
841        Self {
842            buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
843            #[cfg(feature = "encryption")]
844            page_encryptor: None,
845        }
846    }
847
848    #[cfg(feature = "encryption")]
849    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
850        self.page_encryptor = page_encryptor;
851        self
852    }
853
854    #[cfg(feature = "encryption")]
855    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
856        self.page_encryptor.as_mut()
857    }
858
859    #[cfg(not(feature = "encryption"))]
860    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
861        None
862    }
863}
864
865impl PageWriter for ArrowPageWriter {
866    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
867        let page = match self.page_encryptor_mut() {
868            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
869            None => page,
870        };
871
872        let page_header = page.to_thrift_header()?;
873        let header = {
874            let mut header = Vec::with_capacity(1024);
875
876            match self.page_encryptor_mut() {
877                Some(page_encryptor) => {
878                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
879                    if page.compressed_page().is_data_page() {
880                        page_encryptor.increment_page();
881                    }
882                }
883                None => {
884                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
885                    page_header.write_thrift(&mut protocol)?;
886                }
887            };
888
889            Bytes::from(header)
890        };
891
892        let mut buf = self.buffer.try_lock().unwrap();
893
894        let data = page.compressed_page().buffer().clone();
895        let compressed_size = data.len() + header.len();
896
897        let mut spec = PageWriteSpec::new();
898        spec.page_type = page.page_type();
899        spec.num_values = page.num_values();
900        spec.uncompressed_size = page.uncompressed_size() + header.len();
901        spec.offset = buf.length as u64;
902        spec.compressed_size = compressed_size;
903        spec.bytes_written = compressed_size as u64;
904
905        buf.length += compressed_size;
906        if spec.page_type == PageType::DICTIONARY_PAGE {
907            // Recorded apart from the data pages so it is emitted first at
908            // splice — see `ArrowColumnChunkData::dictionary_keys`.
909            buf.push_dictionary(header)?;
910            buf.push_dictionary(data)?;
911        } else {
912            buf.push(header)?;
913            buf.push(data)?;
914        }
915
916        Ok(spec)
917    }
918
919    fn defers_dictionary_ordering(&self) -> bool {
920        // The Arrow chunk is buffered in full and spliced at row-group flush, so
921        // data pages may be accepted before the dictionary page and reordered
922        // then. This lets `GenericColumnWriter` stream dictionary-column data
923        // pages straight through instead of buffering them in memory.
924        true
925    }
926
927    fn buffered_memory_size(&self) -> usize {
928        // Only what is actually resident: a spilling store reports ~0 here even
929        // though the chunk's bytes have all passed through it.
930        self.buffer.try_lock().unwrap().memory_size()
931    }
932
933    fn close(&mut self) -> Result<()> {
934        Ok(())
935    }
936}
937
938/// A leaf column that can be encoded by [`ArrowColumnWriter`]
939#[derive(Debug)]
940pub struct ArrowLeafColumn(ArrayLevels);
941
942/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
943///
944/// This function can be used along with [`get_column_writers`] to encode
945/// individual columns in parallel. See example on [`ArrowColumnWriter`]
946pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
947    let levels = calculate_array_levels(array, field)?;
948    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
949}
950
951/// The data for a single column chunk, see [`ArrowColumnWriter`]
952pub struct ArrowColumnChunk {
953    data: ArrowColumnChunkData,
954    close: ColumnCloseResult,
955}
956
957impl std::fmt::Debug for ArrowColumnChunk {
958    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
959        f.debug_struct("ArrowColumnChunk")
960            .field("length", &self.data.length)
961            .finish_non_exhaustive()
962    }
963}
964
965impl ArrowColumnChunk {
966    /// Returns the [`ColumnCloseResult`] produced when the chunk was closed.
967    ///
968    /// Exposes encoding information, collected statistics, and the optional
969    /// [`ColumnIndexMetaData`](crate::file::page_index::column_index::ColumnIndexMetaData)
970    /// / [`OffsetIndexMetaData`](crate::file::page_index::offset_index::OffsetIndexMetaData)
971    /// gathered for the column chunk.
972    pub fn close(&self) -> &ColumnCloseResult {
973        &self.close
974    }
975
976    /// Returns a mutable reference to the [`ColumnCloseResult`].
977    ///
978    /// This allows callers to mutate the close result before the chunk is
979    /// appended to a row group — for example, clearing `column_index` or
980    /// `bloom_filter` based on a dynamic rule that inspects the encodings and
981    /// collected page statistics.
982    pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
983        &mut self.close
984    }
985
986    /// Splices this column's buffered pages into the row group, streaming them
987    /// back out of the [`PageStore`] one page at a time.
988    pub fn append_to_row_group<W: Write + Send>(
989        self,
990        writer: &mut SerializedRowGroupWriter<'_, W>,
991    ) -> Result<()> {
992        let ArrowColumnChunk { data, close } = self;
993
994        // The dictionary page is produced *after* the data pages on this path (so
995        // they can stream straight through) but must be written *first*, so move
996        // it ahead of the data pages in the recorded offsets before the splice.
997        let close = close.update_dictionary_location(data.dictionary_len)?;
998
999        let reader = StreamingColumnChunkReader::new(data);
1000        writer.append_column_from_read(reader, close)
1001    }
1002}
1003
1004/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
1005///
1006/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
1007///
1008/// Note: This is a low-level interface for applications that require
1009/// fine-grained control of encoding (e.g. encoding using multiple threads),
1010/// see [`ArrowWriter`] for a higher-level interface
1011///
1012/// # Example: Encoding two Arrow Array's in Parallel
1013/// ```
1014/// // The arrow schema
1015/// # use std::sync::Arc;
1016/// # use arrow_array::*;
1017/// # use arrow_schema::*;
1018/// # use parquet::arrow::ArrowSchemaConverter;
1019/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
1020/// # use parquet::file::properties::WriterProperties;
1021/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1022/// #
1023/// let schema = Arc::new(Schema::new(vec![
1024///     Field::new("i32", DataType::Int32, false),
1025///     Field::new("f32", DataType::Float32, false),
1026/// ]));
1027///
1028/// // Compute the parquet schema
1029/// let props = Arc::new(WriterProperties::default());
1030/// let parquet_schema = ArrowSchemaConverter::new()
1031///   .with_coerce_types(props.coerce_types())
1032///   .convert(&schema)
1033///   .unwrap();
1034///
1035/// // Create parquet writer
1036/// let root_schema = parquet_schema.root_schema_ptr();
1037/// // write to memory in the example, but this could be a File
1038/// let mut out = Vec::with_capacity(1024);
1039/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
1040///   .unwrap();
1041///
1042/// // Create a factory for building Arrow column writers
1043/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
1044/// // Create column writers for the 0th row group
1045/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
1046///
1047/// // Spawn a worker thread for each column
1048/// //
1049/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
1050/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
1051/// let mut workers: Vec<_> = col_writers
1052///     .into_iter()
1053///     .map(|mut col_writer| {
1054///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1055///         let handle = std::thread::spawn(move || {
1056///             // receive Arrays to encode via the channel
1057///             for col in recv {
1058///                 col_writer.write(&col)?;
1059///             }
1060///             // once the input is complete, close the writer
1061///             // to return the newly created ArrowColumnChunk
1062///             col_writer.close()
1063///         });
1064///         (handle, send)
1065///     })
1066///     .collect();
1067///
1068/// // Start row group
1069/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
1070///   .next_row_group()
1071///   .unwrap();
1072///
1073/// // Create some example input columns to encode
1074/// let to_write = vec![
1075///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
1076///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
1077/// ];
1078///
1079/// // Send the input columns to the workers
1080/// let mut worker_iter = workers.iter_mut();
1081/// for (arr, field) in to_write.iter().zip(&schema.fields) {
1082///     for leaves in compute_leaves(field, arr).unwrap() {
1083///         worker_iter.next().unwrap().1.send(leaves).unwrap();
1084///     }
1085/// }
1086///
1087/// // Wait for the workers to complete encoding, and append
1088/// // the resulting column chunks to the row group (and the file)
1089/// for (handle, send) in workers {
1090///     drop(send); // Drop send side to signal termination
1091///     // wait for the worker to send the completed chunk
1092///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1093///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
1094/// }
1095/// // Close the row group which writes to the underlying file
1096/// row_group_writer.close().unwrap();
1097///
1098/// let metadata = writer.close().unwrap();
1099/// assert_eq!(metadata.file_metadata().num_rows(), 3);
1100/// ```
1101pub struct ArrowColumnWriter {
1102    writer: ArrowColumnWriterImpl,
1103    chunk: SharedColumnChunk,
1104}
1105
1106impl std::fmt::Debug for ArrowColumnWriter {
1107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1109    }
1110}
1111
1112enum ArrowColumnWriterImpl {
1113    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1114    Column(ColumnWriter<'static>),
1115}
1116
1117impl ArrowColumnWriter {
1118    /// Write an [`ArrowLeafColumn`]
1119    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1120        self.write_internal(&col.0)
1121    }
1122
1123    /// Write with content-defined chunking, inserting page flushes at chunk boundaries.
1124    fn write_with_chunker(
1125        &mut self,
1126        col: &ArrowLeafColumn,
1127        chunker: &mut ContentDefinedChunker,
1128    ) -> Result<()> {
1129        let levels = &col.0;
1130        let chunks = chunker.get_arrow_chunks(
1131            levels.def_level_data().as_ref(),
1132            levels.rep_level_data().as_ref(),
1133            levels.array(),
1134        )?;
1135
1136        let num_chunks = chunks.len();
1137        for (i, chunk) in chunks.iter().enumerate() {
1138            let chunk_levels = levels.slice_for_chunk(chunk);
1139            self.write_internal(&chunk_levels)?;
1140
1141            // Add a page break after each chunk except the last
1142            if i + 1 < num_chunks {
1143                match &mut self.writer {
1144                    ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1145                    ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1146                }
1147            }
1148        }
1149        Ok(())
1150    }
1151
1152    fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1153        match &mut self.writer {
1154            ArrowColumnWriterImpl::Column(c) => {
1155                let leaf = levels.array();
1156                match leaf.as_any_dictionary_opt() {
1157                    Some(dictionary) => {
1158                        let materialized =
1159                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1160                        write_leaf(c, &materialized, levels)?
1161                    }
1162                    None => write_leaf(c, leaf, levels)?,
1163                };
1164            }
1165            ArrowColumnWriterImpl::ByteArray(c) => {
1166                write_primitive(c, levels.array().as_ref(), levels)?;
1167            }
1168        }
1169        Ok(())
1170    }
1171
1172    /// Close this column returning the written [`ArrowColumnChunk`]
1173    pub fn close(self) -> Result<ArrowColumnChunk> {
1174        let close = match self.writer {
1175            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1176            ArrowColumnWriterImpl::Column(c) => c.close()?,
1177        };
1178        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1179        let data = chunk.into_inner().unwrap();
1180        Ok(ArrowColumnChunk { data, close })
1181    }
1182
1183    /// Returns the estimated total memory usage by the writer.
1184    ///
1185    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
1186    /// of the current memory usage and not it's anticipated encoded size.
1187    ///
1188    /// This includes:
1189    /// 1. Data buffered in encoded form
1190    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
1191    ///
1192    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
1193    pub fn memory_size(&self) -> usize {
1194        match &self.writer {
1195            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1196            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1197        }
1198    }
1199
1200    /// Returns the estimated total encoded bytes for this column writer.
1201    ///
1202    /// This includes:
1203    /// 1. Data buffered in encoded form
1204    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
1205    ///
1206    /// This value should be less than or equal to [`Self::memory_size`]
1207    pub fn get_estimated_total_bytes(&self) -> usize {
1208        match &self.writer {
1209            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1210            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1211        }
1212    }
1213}
1214
1215/// Encodes [`RecordBatch`] to a parquet row group
1216///
1217/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
1218/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
1219///
1220/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1221#[derive(Debug)]
1222struct ArrowRowGroupWriter {
1223    writers: Vec<ArrowColumnWriter>,
1224    schema: SchemaRef,
1225    buffered_rows: usize,
1226}
1227
1228impl ArrowRowGroupWriter {
1229    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1230        Self {
1231            writers,
1232            schema: arrow.clone(),
1233            buffered_rows: 0,
1234        }
1235    }
1236
1237    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1238        self.buffered_rows += batch.num_rows();
1239        let mut writers = self.writers.iter_mut();
1240        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1241            for leaf in compute_leaves(field.as_ref(), column)? {
1242                writers.next().unwrap().write(&leaf)?;
1243            }
1244        }
1245        Ok(())
1246    }
1247
1248    fn write_with_chunkers(
1249        &mut self,
1250        batch: &RecordBatch,
1251        chunkers: &mut [ContentDefinedChunker],
1252    ) -> Result<()> {
1253        self.buffered_rows += batch.num_rows();
1254        let mut writers = self.writers.iter_mut();
1255        let mut chunkers = chunkers.iter_mut();
1256        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1257            for leaf in compute_leaves(field.as_ref(), column)? {
1258                writers
1259                    .next()
1260                    .unwrap()
1261                    .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1262            }
1263        }
1264        Ok(())
1265    }
1266
1267    /// Returns the estimated total encoded bytes for this row group
1268    fn get_estimated_total_bytes(&self) -> usize {
1269        self.writers
1270            .iter()
1271            .map(|x| x.get_estimated_total_bytes())
1272            .sum()
1273    }
1274
1275    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1276        self.writers
1277            .into_iter()
1278            .map(|writer| writer.close())
1279            .collect()
1280    }
1281}
1282
1283/// Factory that creates new column writers for each row group in the Parquet file.
1284///
1285/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
1286/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1287#[derive(Debug)]
1288pub struct ArrowRowGroupWriterFactory {
1289    schema: SchemaDescPtr,
1290    arrow_schema: SchemaRef,
1291    props: WriterPropertiesPtr,
1292    page_store_factory: Arc<dyn PageStoreFactory>,
1293    #[cfg(feature = "encryption")]
1294    file_encryptor: Option<Arc<FileEncryptor>>,
1295}
1296
1297impl ArrowRowGroupWriterFactory {
1298    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
1299    pub fn new<W: Write + Send>(
1300        file_writer: &SerializedFileWriter<W>,
1301        arrow_schema: SchemaRef,
1302    ) -> Self {
1303        let schema = Arc::clone(file_writer.schema_descr_ptr());
1304        let props = Arc::clone(file_writer.properties());
1305        Self {
1306            schema,
1307            arrow_schema,
1308            props,
1309            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1310            #[cfg(feature = "encryption")]
1311            file_encryptor: file_writer.file_encryptor(),
1312        }
1313    }
1314
1315    /// Set the [`PageStoreFactory`] used to allocate the buffer for each column
1316    /// chunk, e.g. to spill completed pages to a temp file or object storage
1317    /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`].
1318    pub fn with_page_store_factory(
1319        mut self,
1320        page_store_factory: Arc<dyn PageStoreFactory>,
1321    ) -> Self {
1322        self.page_store_factory = page_store_factory;
1323        self
1324    }
1325
1326    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1327        let writers = self.create_column_writers(row_group_index)?;
1328        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1329    }
1330
1331    /// Create column writers for a new row group, with the given row group index
1332    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1333        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1334        let mut leaves = self.schema.columns().iter();
1335        let column_factory = self.column_writer_factory(row_group_index);
1336        for field in &self.arrow_schema.fields {
1337            column_factory.get_arrow_column_writer(
1338                field.data_type(),
1339                &self.props,
1340                &mut leaves,
1341                &mut writers,
1342            )?;
1343        }
1344        Ok(writers)
1345    }
1346
1347    #[cfg(feature = "encryption")]
1348    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1349        ArrowColumnWriterFactory::new()
1350            .with_page_store_factory(self.page_store_factory.clone())
1351            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1352    }
1353
1354    #[cfg(not(feature = "encryption"))]
1355    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1356        ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1357    }
1358}
1359
1360/// Returns [`ArrowColumnWriter`]s for each column in a given schema
1361#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1362pub fn get_column_writers(
1363    parquet: &SchemaDescriptor,
1364    props: &WriterPropertiesPtr,
1365    arrow: &SchemaRef,
1366) -> Result<Vec<ArrowColumnWriter>> {
1367    let mut writers = Vec::with_capacity(arrow.fields.len());
1368    let mut leaves = parquet.columns().iter();
1369    let column_factory = ArrowColumnWriterFactory::new();
1370    for field in &arrow.fields {
1371        column_factory.get_arrow_column_writer(
1372            field.data_type(),
1373            props,
1374            &mut leaves,
1375            &mut writers,
1376        )?;
1377    }
1378    Ok(writers)
1379}
1380
1381/// Creates [`ArrowColumnWriter`] instances
1382struct ArrowColumnWriterFactory {
1383    /// Allocates the per-column-chunk [`PageStore`] backing each page writer.
1384    page_store_factory: Arc<dyn PageStoreFactory>,
1385    #[cfg(feature = "encryption")]
1386    row_group_index: usize,
1387    #[cfg(feature = "encryption")]
1388    file_encryptor: Option<Arc<FileEncryptor>>,
1389}
1390
1391impl ArrowColumnWriterFactory {
1392    pub fn new() -> Self {
1393        Self {
1394            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1395            #[cfg(feature = "encryption")]
1396            row_group_index: 0,
1397            #[cfg(feature = "encryption")]
1398            file_encryptor: None,
1399        }
1400    }
1401
1402    /// Use `page_store_factory` to allocate the buffer for each column chunk.
1403    pub fn with_page_store_factory(
1404        mut self,
1405        page_store_factory: Arc<dyn PageStoreFactory>,
1406    ) -> Self {
1407        self.page_store_factory = page_store_factory;
1408        self
1409    }
1410
1411    #[cfg(feature = "encryption")]
1412    pub fn with_file_encryptor(
1413        mut self,
1414        row_group_index: usize,
1415        file_encryptor: Option<Arc<FileEncryptor>>,
1416    ) -> Self {
1417        self.row_group_index = row_group_index;
1418        self.file_encryptor = file_encryptor;
1419        self
1420    }
1421
1422    #[cfg(feature = "encryption")]
1423    fn create_page_writer(
1424        &self,
1425        column_descriptor: &ColumnDescPtr,
1426        column_index: usize,
1427    ) -> Result<Box<ArrowPageWriter>> {
1428        let column_path = column_descriptor.path().string();
1429        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1430            &self.file_encryptor,
1431            self.row_group_index,
1432            column_index,
1433            &column_path,
1434        )?;
1435        let args = PageStoreArgs::new(column_index, column_descriptor);
1436        let store = self.page_store_factory.create(&args)?;
1437        Ok(Box::new(
1438            ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1439        ))
1440    }
1441
1442    #[cfg(not(feature = "encryption"))]
1443    fn create_page_writer(
1444        &self,
1445        column_descriptor: &ColumnDescPtr,
1446        column_index: usize,
1447    ) -> Result<Box<ArrowPageWriter>> {
1448        let args = PageStoreArgs::new(column_index, column_descriptor);
1449        let store = self.page_store_factory.create(&args)?;
1450        Ok(Box::new(ArrowPageWriter::new(store)))
1451    }
1452
1453    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1454    /// output ColumnDesc to `leaves` and the column writers to `out`
1455    fn get_arrow_column_writer(
1456        &self,
1457        data_type: &ArrowDataType,
1458        props: &WriterPropertiesPtr,
1459        leaves: &mut Iter<'_, ColumnDescPtr>,
1460        out: &mut Vec<ArrowColumnWriter>,
1461    ) -> Result<()> {
1462        // Instantiate writers for normal columns
1463        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1464            let page_writer = self.create_page_writer(desc, out.len())?;
1465            let chunk = page_writer.buffer.clone();
1466            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1467            Ok(ArrowColumnWriter {
1468                chunk,
1469                writer: ArrowColumnWriterImpl::Column(writer),
1470            })
1471        };
1472
1473        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1474        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1475            let page_writer = self.create_page_writer(desc, out.len())?;
1476            let chunk = page_writer.buffer.clone();
1477            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1478            Ok(ArrowColumnWriter {
1479                chunk,
1480                writer: ArrowColumnWriterImpl::ByteArray(writer),
1481            })
1482        };
1483
1484        match data_type {
1485            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1486            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1487                out.push(col(leaves.next().unwrap())?)
1488            }
1489            ArrowDataType::LargeBinary
1490            | ArrowDataType::Binary
1491            | ArrowDataType::Utf8
1492            | ArrowDataType::LargeUtf8
1493            | ArrowDataType::BinaryView
1494            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1495            ArrowDataType::List(f)
1496            | ArrowDataType::LargeList(f)
1497            | ArrowDataType::FixedSizeList(f, _)
1498            | ArrowDataType::ListView(f)
1499            | ArrowDataType::LargeListView(f) => {
1500                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1501            }
1502            ArrowDataType::Struct(fields) => {
1503                for field in fields {
1504                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1505                }
1506            }
1507            ArrowDataType::Map(f, _) => match f.data_type() {
1508                ArrowDataType::Struct(f) => {
1509                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1510                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1511                }
1512                _ => unreachable!("invalid map type"),
1513            },
1514            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1515                ArrowDataType::Utf8
1516                | ArrowDataType::LargeUtf8
1517                | ArrowDataType::Binary
1518                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1519                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1520                    out.push(bytes(leaves.next().unwrap())?)
1521                }
1522                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1523                _ => out.push(col(leaves.next().unwrap())?),
1524            },
1525            ArrowDataType::RunEndEncoded(_, value_field) => {
1526                self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1527            }
1528            _ => {
1529                return Err(ParquetError::NYI(format!(
1530                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1531                )));
1532            }
1533        }
1534        Ok(())
1535    }
1536}
1537
1538fn write_leaf(
1539    writer: &mut ColumnWriter<'_>,
1540    column: &dyn arrow_array::Array,
1541    levels: &ArrayLevels,
1542) -> Result<usize> {
1543    let indices = levels.non_null_indices();
1544
1545    match writer {
1546        // Note: this should match the contents of arrow_to_parquet_type
1547        ColumnWriter::Int32ColumnWriter(typed) => {
1548            match column.data_type() {
1549                ArrowDataType::Null => {
1550                    let array = Int32Array::new_null(column.len());
1551                    write_primitive(typed, array.values(), levels)
1552                }
1553                ArrowDataType::Int8 => {
1554                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1555                    write_primitive(typed, array.values(), levels)
1556                }
1557                ArrowDataType::Int16 => {
1558                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1559                    write_primitive(typed, array.values(), levels)
1560                }
1561                ArrowDataType::Int32 => {
1562                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1563                }
1564                ArrowDataType::UInt8 => {
1565                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1566                    write_primitive(typed, array.values(), levels)
1567                }
1568                ArrowDataType::UInt16 => {
1569                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1570                    write_primitive(typed, array.values(), levels)
1571                }
1572                ArrowDataType::UInt32 => {
1573                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1574                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1575                    let array = column.as_primitive::<UInt32Type>();
1576                    write_primitive(typed, array.values().inner().typed_data(), levels)
1577                }
1578                ArrowDataType::Date32 => {
1579                    let array = column.as_primitive::<Date32Type>();
1580                    write_primitive(typed, array.values(), levels)
1581                }
1582                ArrowDataType::Time32(TimeUnit::Second) => {
1583                    let array = column.as_primitive::<Time32SecondType>();
1584                    write_primitive(typed, array.values(), levels)
1585                }
1586                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1587                    let array = column.as_primitive::<Time32MillisecondType>();
1588                    write_primitive(typed, array.values(), levels)
1589                }
1590                ArrowDataType::Date64 => {
1591                    // If the column is a Date64, we truncate it
1592                    let array: Int32Array = column
1593                        .as_primitive::<Date64Type>()
1594                        .unary(|x| (x / 86_400_000) as _);
1595
1596                    write_primitive(typed, array.values(), levels)
1597                }
1598                ArrowDataType::Decimal32(_, _) => {
1599                    let array = column
1600                        .as_primitive::<Decimal32Type>()
1601                        .unary::<_, Int32Type>(|v| v);
1602                    write_primitive(typed, array.values(), levels)
1603                }
1604                ArrowDataType::Decimal64(_, _) => {
1605                    // use the int32 to represent the decimal with low precision
1606                    let array = column
1607                        .as_primitive::<Decimal64Type>()
1608                        .unary::<_, Int32Type>(|v| v as i32);
1609                    write_primitive(typed, array.values(), levels)
1610                }
1611                ArrowDataType::Decimal128(_, _) => {
1612                    // use the int32 to represent the decimal with low precision
1613                    let array = column
1614                        .as_primitive::<Decimal128Type>()
1615                        .unary::<_, Int32Type>(|v| v as i32);
1616                    write_primitive(typed, array.values(), levels)
1617                }
1618                ArrowDataType::Decimal256(_, _) => {
1619                    // use the int32 to represent the decimal with low precision
1620                    let array = column
1621                        .as_primitive::<Decimal256Type>()
1622                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1623                    write_primitive(typed, array.values(), levels)
1624                }
1625                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1626            }
1627        }
1628        ColumnWriter::BoolColumnWriter(typed) => {
1629            let array = column.as_boolean();
1630            let values = get_bool_array_slice(array, indices.iter().copied());
1631            typed.write_batch_internal(
1632                values.as_slice(),
1633                None,
1634                levels.def_level_data().as_ref(),
1635                levels.rep_level_data().as_ref(),
1636                None,
1637                None,
1638                None,
1639            )
1640        }
1641        ColumnWriter::Int64ColumnWriter(typed) => {
1642            match column.data_type() {
1643                ArrowDataType::Date64 => {
1644                    let array = column
1645                        .as_primitive::<Date64Type>()
1646                        .reinterpret_cast::<Int64Type>();
1647
1648                    write_primitive(typed, array.values(), levels)
1649                }
1650                ArrowDataType::Int64 => {
1651                    let array = column.as_primitive::<Int64Type>();
1652                    write_primitive(typed, array.values(), levels)
1653                }
1654                ArrowDataType::UInt64 => {
1655                    let values = column.as_primitive::<UInt64Type>().values();
1656                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1657                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1658                    let array = values.inner().typed_data::<i64>();
1659                    write_primitive(typed, array, levels)
1660                }
1661                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1662                    let array = column.as_primitive::<Time64MicrosecondType>();
1663                    write_primitive(typed, array.values(), levels)
1664                }
1665                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1666                    let array = column.as_primitive::<Time64NanosecondType>();
1667                    write_primitive(typed, array.values(), levels)
1668                }
1669                ArrowDataType::Timestamp(unit, _) => match unit {
1670                    TimeUnit::Second => {
1671                        let array = column.as_primitive::<TimestampSecondType>();
1672                        write_primitive(typed, array.values(), levels)
1673                    }
1674                    TimeUnit::Millisecond => {
1675                        let array = column.as_primitive::<TimestampMillisecondType>();
1676                        write_primitive(typed, array.values(), levels)
1677                    }
1678                    TimeUnit::Microsecond => {
1679                        let array = column.as_primitive::<TimestampMicrosecondType>();
1680                        write_primitive(typed, array.values(), levels)
1681                    }
1682                    TimeUnit::Nanosecond => {
1683                        let array = column.as_primitive::<TimestampNanosecondType>();
1684                        write_primitive(typed, array.values(), levels)
1685                    }
1686                },
1687                ArrowDataType::Duration(unit) => match unit {
1688                    TimeUnit::Second => {
1689                        let array = column.as_primitive::<DurationSecondType>();
1690                        write_primitive(typed, array.values(), levels)
1691                    }
1692                    TimeUnit::Millisecond => {
1693                        let array = column.as_primitive::<DurationMillisecondType>();
1694                        write_primitive(typed, array.values(), levels)
1695                    }
1696                    TimeUnit::Microsecond => {
1697                        let array = column.as_primitive::<DurationMicrosecondType>();
1698                        write_primitive(typed, array.values(), levels)
1699                    }
1700                    TimeUnit::Nanosecond => {
1701                        let array = column.as_primitive::<DurationNanosecondType>();
1702                        write_primitive(typed, array.values(), levels)
1703                    }
1704                },
1705                ArrowDataType::Decimal64(_, _) => {
1706                    let array = column
1707                        .as_primitive::<Decimal64Type>()
1708                        .reinterpret_cast::<Int64Type>();
1709                    write_primitive(typed, array.values(), levels)
1710                }
1711                ArrowDataType::Decimal128(_, _) => {
1712                    // use the int64 to represent the decimal with low precision
1713                    let array = column
1714                        .as_primitive::<Decimal128Type>()
1715                        .unary::<_, Int64Type>(|v| v as i64);
1716                    write_primitive(typed, array.values(), levels)
1717                }
1718                ArrowDataType::Decimal256(_, _) => {
1719                    // use the int64 to represent the decimal with low precision
1720                    let array = column
1721                        .as_primitive::<Decimal256Type>()
1722                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1723                    write_primitive(typed, array.values(), levels)
1724                }
1725                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1726            }
1727        }
1728        ColumnWriter::Int96ColumnWriter(_typed) => {
1729            unreachable!("Currently unreachable because data type not supported")
1730        }
1731        ColumnWriter::FloatColumnWriter(typed) => {
1732            let array = column.as_primitive::<Float32Type>();
1733            write_primitive(typed, array.values(), levels)
1734        }
1735        ColumnWriter::DoubleColumnWriter(typed) => {
1736            let array = column.as_primitive::<Float64Type>();
1737            write_primitive(typed, array.values(), levels)
1738        }
1739        ColumnWriter::ByteArrayColumnWriter(_) => {
1740            unreachable!("should use ByteArrayWriter")
1741        }
1742        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1743            let bytes = match column.data_type() {
1744                ArrowDataType::Interval(interval_unit) => match interval_unit {
1745                    IntervalUnit::YearMonth => {
1746                        let array = column.as_primitive::<IntervalYearMonthType>();
1747                        get_interval_ym_array_slice(array, indices.iter().copied())
1748                    }
1749                    IntervalUnit::DayTime => {
1750                        let array = column.as_primitive::<IntervalDayTimeType>();
1751                        get_interval_dt_array_slice(array, indices.iter().copied())
1752                    }
1753                    _ => {
1754                        return Err(ParquetError::NYI(format!(
1755                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1756                        )));
1757                    }
1758                },
1759                ArrowDataType::FixedSizeBinary(_) => {
1760                    let array = column.as_fixed_size_binary();
1761                    get_fsb_array_slice(array, indices.iter().copied())
1762                }
1763                ArrowDataType::Decimal32(_, _) => {
1764                    let array = column.as_primitive::<Decimal32Type>();
1765                    get_decimal_32_array_slice(array, indices.iter().copied())
1766                }
1767                ArrowDataType::Decimal64(_, _) => {
1768                    let array = column.as_primitive::<Decimal64Type>();
1769                    get_decimal_64_array_slice(array, indices.iter().copied())
1770                }
1771                ArrowDataType::Decimal128(_, _) => {
1772                    let array = column.as_primitive::<Decimal128Type>();
1773                    get_decimal_128_array_slice(array, indices.iter().copied())
1774                }
1775                ArrowDataType::Decimal256(_, _) => {
1776                    let array = column.as_primitive::<Decimal256Type>();
1777                    get_decimal_256_array_slice(array, indices.iter().copied())
1778                }
1779                ArrowDataType::Float16 => {
1780                    let array = column.as_primitive::<Float16Type>();
1781                    get_float_16_array_slice(array, indices.iter().copied())
1782                }
1783                _ => {
1784                    return Err(ParquetError::NYI(
1785                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1786                    ));
1787                }
1788            };
1789            typed.write_batch_internal(
1790                bytes.as_slice(),
1791                None,
1792                levels.def_level_data().as_ref(),
1793                levels.rep_level_data().as_ref(),
1794                None,
1795                None,
1796                None,
1797            )
1798        }
1799    }
1800}
1801
1802fn write_primitive<E: ColumnValueEncoder>(
1803    writer: &mut GenericColumnWriter<E>,
1804    values: &E::Values,
1805    levels: &ArrayLevels,
1806) -> Result<usize> {
1807    writer.write_batch_internal(
1808        values,
1809        Some(levels.non_null_indices()),
1810        levels.def_level_data().as_ref(),
1811        levels.rep_level_data().as_ref(),
1812        None,
1813        None,
1814        None,
1815    )
1816}
1817
1818fn get_bool_array_slice(
1819    array: &arrow_array::BooleanArray,
1820    indices: impl ExactSizeIterator<Item = usize>,
1821) -> Vec<bool> {
1822    let mut values = Vec::with_capacity(indices.len());
1823    for i in indices {
1824        values.push(array.value(i))
1825    }
1826    values
1827}
1828
1829/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1830/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1831fn get_interval_ym_array_slice(
1832    array: &arrow_array::IntervalYearMonthArray,
1833    indices: impl ExactSizeIterator<Item = usize>,
1834) -> Vec<FixedLenByteArray> {
1835    let mut values = Vec::with_capacity(indices.len());
1836    for i in indices {
1837        let mut value = array.value(i).to_le_bytes().to_vec();
1838        let mut suffix = vec![0; 8];
1839        value.append(&mut suffix);
1840        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1841    }
1842    values
1843}
1844
1845/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1846/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1847fn get_interval_dt_array_slice(
1848    array: &arrow_array::IntervalDayTimeArray,
1849    indices: impl ExactSizeIterator<Item = usize>,
1850) -> Vec<FixedLenByteArray> {
1851    let mut values = Vec::with_capacity(indices.len());
1852    for i in indices {
1853        let mut out = [0; 12];
1854        let value = array.value(i);
1855        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1856        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1857        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1858    }
1859    values
1860}
1861
1862fn get_decimal_32_array_slice(
1863    array: &arrow_array::Decimal32Array,
1864    indices: impl ExactSizeIterator<Item = usize>,
1865) -> Vec<FixedLenByteArray> {
1866    let mut values = Vec::with_capacity(indices.len());
1867    let size = decimal_length_from_precision(array.precision());
1868    for i in indices {
1869        let as_be_bytes = array.value(i).to_be_bytes();
1870        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1871        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1872    }
1873    values
1874}
1875
1876fn get_decimal_64_array_slice(
1877    array: &arrow_array::Decimal64Array,
1878    indices: impl ExactSizeIterator<Item = usize>,
1879) -> Vec<FixedLenByteArray> {
1880    let mut values = Vec::with_capacity(indices.len());
1881    let size = decimal_length_from_precision(array.precision());
1882    for i in indices {
1883        let as_be_bytes = array.value(i).to_be_bytes();
1884        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1885        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1886    }
1887    values
1888}
1889
1890fn get_decimal_128_array_slice(
1891    array: &arrow_array::Decimal128Array,
1892    indices: impl ExactSizeIterator<Item = usize>,
1893) -> Vec<FixedLenByteArray> {
1894    let mut values = Vec::with_capacity(indices.len());
1895    let size = decimal_length_from_precision(array.precision());
1896    for i in indices {
1897        let as_be_bytes = array.value(i).to_be_bytes();
1898        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1899        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1900    }
1901    values
1902}
1903
1904fn get_decimal_256_array_slice(
1905    array: &arrow_array::Decimal256Array,
1906    indices: impl ExactSizeIterator<Item = usize>,
1907) -> Vec<FixedLenByteArray> {
1908    let mut values = Vec::with_capacity(indices.len());
1909    let size = decimal_length_from_precision(array.precision());
1910    for i in indices {
1911        let as_be_bytes = array.value(i).to_be_bytes();
1912        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1913        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1914    }
1915    values
1916}
1917
1918fn get_float_16_array_slice(
1919    array: &arrow_array::Float16Array,
1920    indices: impl ExactSizeIterator<Item = usize>,
1921) -> Vec<FixedLenByteArray> {
1922    let mut values = Vec::with_capacity(indices.len());
1923    for i in indices {
1924        let value = array.value(i).to_le_bytes().to_vec();
1925        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1926    }
1927    values
1928}
1929
1930fn get_fsb_array_slice(
1931    array: &arrow_array::FixedSizeBinaryArray,
1932    indices: impl ExactSizeIterator<Item = usize>,
1933) -> Vec<FixedLenByteArray> {
1934    let mut values = Vec::with_capacity(indices.len());
1935    for i in indices {
1936        let value = array.value(i).to_vec();
1937        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1938    }
1939    values
1940}
1941
1942#[cfg(test)]
1943mod tests {
1944    use super::*;
1945    use std::collections::HashMap;
1946
1947    use std::fs::File;
1948
1949    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1950    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1951    use crate::column::page::{Page, PageReader};
1952    use crate::file::metadata::thrift::PageHeader;
1953    use crate::file::page_index::column_index::ColumnIndexMetaData;
1954    use crate::file::reader::SerializedPageReader;
1955    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1956    use crate::schema::types::ColumnPath;
1957    use arrow::datatypes::ToByteSlice;
1958    use arrow::datatypes::{DataType, Schema};
1959    use arrow::error::Result as ArrowResult;
1960    use arrow::util::data_gen::create_random_array;
1961    use arrow::util::pretty::pretty_format_batches;
1962    use arrow::{array::*, buffer::Buffer};
1963    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1964    use arrow_schema::Fields;
1965    use half::f16;
1966    use num_traits::{FromPrimitive, ToPrimitive};
1967    use tempfile::tempfile;
1968
1969    use crate::basic::Encoding;
1970    use crate::data_type::AsBytes;
1971    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1972    use crate::file::properties::{
1973        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1974    };
1975    use crate::file::serialized_reader::ReadOptionsBuilder;
1976    use crate::file::{
1977        reader::{FileReader, SerializedFileReader},
1978        statistics::Statistics,
1979    };
1980
1981    /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and keeps
1982    /// blobs in a `HashMap` — nothing like the default `Vec<Bytes>`. Used to
1983    /// prove the writer relies only on the opaque-handle contract and never on
1984    /// handles being dense `Vec` indices. Records how many blobs were stored.
1985    #[derive(Debug, Default)]
1986    struct RecordingPageStore {
1987        next: u64,
1988        blobs: HashMap<u64, Bytes>,
1989        puts: Arc<std::sync::atomic::AtomicUsize>,
1990    }
1991
1992    impl PageStore for RecordingPageStore {
1993        fn put(&mut self, value: Bytes) -> Result<PageKey> {
1994            // Deliberately non-sequential, never-zero handles.
1995            let id = 100 + self.next * 7;
1996            self.next += 1;
1997            self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1998            self.blobs.insert(id, value);
1999            Ok(PageKey::new(id))
2000        }
2001
2002        fn take(&mut self, key: PageKey) -> Result<Bytes> {
2003            self.blobs
2004                .remove(&key.get())
2005                .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
2006        }
2007    }
2008
2009    #[derive(Debug)]
2010    struct RecordingPageStoreFactory {
2011        puts: Arc<std::sync::atomic::AtomicUsize>,
2012    }
2013
2014    impl PageStoreFactory for RecordingPageStoreFactory {
2015        fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2016            Ok(Box::new(RecordingPageStore {
2017                puts: self.puts.clone(),
2018                ..Default::default()
2019            }))
2020        }
2021    }
2022
2023    /// A custom [`PageStore`] must produce byte-identical files to the in-memory
2024    /// default, across dictionary and non-dictionary columns and multiple row
2025    /// groups (so multiple store instances are exercised).
2026    #[test]
2027    fn custom_page_store_is_byte_identical_to_default() {
2028        let schema = Arc::new(Schema::new(vec![
2029            Field::new("i", DataType::Int32, true),
2030            // A low-cardinality string column to exercise the dictionary path.
2031            Field::new("s", DataType::Utf8, true),
2032        ]));
2033        let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2034        let s = StringArray::from(vec![
2035            Some("a"),
2036            Some("bb"),
2037            Some("a"),
2038            None,
2039            Some("bb"),
2040            Some("ccc"),
2041        ]);
2042        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2043
2044        // Small row groups so multiple column chunks (hence multiple store
2045        // instances) are produced.
2046        let props = WriterProperties::builder()
2047            .set_max_row_group_row_count(Some(3))
2048            .build();
2049
2050        let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2051            let mut buffer = Vec::new();
2052            let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2053            if let Some(factory) = factory {
2054                opts = opts.with_page_store_factory(factory);
2055            }
2056            let mut writer =
2057                ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2058            writer.write(&batch).unwrap();
2059            writer.close().unwrap();
2060            buffer
2061        };
2062
2063        let default_bytes = write(None);
2064
2065        let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2066        let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2067            puts: puts.clone(),
2068        })));
2069
2070        assert!(
2071            puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2072            "custom PageStore was never written to"
2073        );
2074        assert_eq!(
2075            default_bytes, custom_bytes,
2076            "a custom PageStore must produce byte-identical output to the default"
2077        );
2078    }
2079
2080    /// A dictionary-encoded column written through the deferred-ordering Arrow
2081    /// path must round-trip correctly even with the offset index disabled, when
2082    /// only the chunk-level dictionary/data page offsets are rewritten (there is
2083    /// no offset index to rebuild). Spans multiple data pages so the
2084    /// dictionary-first reordering is exercised.
2085    #[test]
2086    fn dictionary_column_round_trips_with_offset_index_disabled() {
2087        let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2088
2089        // Low cardinality so the column stays dictionary-encoded; enough rows to
2090        // span several data pages within a single row group.
2091        let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2092        let array = Int32Array::from(values.clone());
2093        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2094
2095        let props = WriterProperties::builder()
2096            .set_offset_index_disabled(true)
2097            .set_data_page_row_count_limit(4096)
2098            .build();
2099        let opts = ArrowWriterOptions::new().with_properties(props);
2100
2101        let mut buffer = Vec::new();
2102        let mut writer =
2103            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2104        writer.write(&batch).unwrap();
2105        writer.close().unwrap();
2106
2107        let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2108        let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2109        let read_values: Vec<Option<i32>> = read
2110            .iter()
2111            .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2112            .collect();
2113        assert_eq!(read_values, values);
2114    }
2115
2116    /// The dictionary page is routed through the [`PageStore`] like any other
2117    /// page rather than held resident in memory, so a dictionary column chunk's
2118    /// *entire* serialized size — dictionary page included — passes through the
2119    /// store.
2120    #[test]
2121    fn dictionary_page_is_routed_through_the_store() {
2122        /// A store that sums the bytes handed to `put`.
2123        #[derive(Debug, Default)]
2124        struct SizeRecordingPageStore {
2125            blobs: Vec<Bytes>,
2126            bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2127        }
2128        impl PageStore for SizeRecordingPageStore {
2129            fn put(&mut self, value: Bytes) -> Result<PageKey> {
2130                self.bytes_put
2131                    .fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed);
2132                let key = PageKey::new(self.blobs.len() as u64);
2133                self.blobs.push(value);
2134                Ok(key)
2135            }
2136            fn take(&mut self, key: PageKey) -> Result<Bytes> {
2137                Ok(std::mem::take(&mut self.blobs[key.get() as usize]))
2138            }
2139        }
2140        #[derive(Debug)]
2141        struct Factory {
2142            bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2143        }
2144        impl PageStoreFactory for Factory {
2145            fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2146                Ok(Box::new(SizeRecordingPageStore {
2147                    bytes_put: self.bytes_put.clone(),
2148                    ..Default::default()
2149                }))
2150            }
2151        }
2152
2153        let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
2154        // Low cardinality keeps the column dictionary-encoded with a real,
2155        // non-empty dictionary page.
2156        let values: Vec<&str> = (0..2048)
2157            .map(|i| ["alpha", "beta", "gamma", "delta"][i % 4])
2158            .collect();
2159        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])
2160            .unwrap();
2161
2162        let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2163        let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory {
2164            bytes_put: bytes_put.clone(),
2165        }));
2166
2167        // A single batch / single column means exactly one row group and one
2168        // store instance, so the bytes it saw map to one column chunk.
2169        let mut buffer = Vec::new();
2170        let mut writer =
2171            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2172        writer.write(&batch).unwrap();
2173        writer.close().unwrap();
2174
2175        let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
2176        let column = reader.metadata().row_group(0).column(0);
2177        assert!(
2178            column.dictionary_page_offset().is_some(),
2179            "expected the column to be dictionary-encoded"
2180        );
2181
2182        // The bytes the store was handed must account for the whole chunk,
2183        // dictionary page included. Holding the dictionary page apart from the
2184        // store would make this fall short by the dictionary page's size.
2185        assert_eq!(
2186            bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64,
2187            column.compressed_size(),
2188            "the dictionary page must pass through the store like any other page"
2189        );
2190    }
2191
2192    #[test]
2193    fn arrow_writer() {
2194        // define schema
2195        let schema = Schema::new(vec![
2196            Field::new("a", DataType::Int32, false),
2197            Field::new("b", DataType::Int32, true),
2198        ]);
2199
2200        // create some data
2201        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2202        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2203
2204        // build a record batch
2205        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2206
2207        roundtrip(batch, Some(SMALL_SIZE / 2));
2208    }
2209
2210    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2211        let mut buffer = vec![];
2212
2213        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2214        writer.write(expected_batch).unwrap();
2215        writer.close().unwrap();
2216
2217        buffer
2218    }
2219
2220    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2221        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2222        writer.write(expected_batch).unwrap();
2223        writer.into_inner().unwrap()
2224    }
2225
2226    #[test]
2227    fn roundtrip_bytes() {
2228        // define schema
2229        let schema = Arc::new(Schema::new(vec![
2230            Field::new("a", DataType::Int32, false),
2231            Field::new("b", DataType::Int32, true),
2232        ]));
2233
2234        // create some data
2235        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2236        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2237
2238        // build a record batch
2239        let expected_batch =
2240            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2241
2242        for buffer in [
2243            get_bytes_after_close(schema.clone(), &expected_batch),
2244            get_bytes_by_into_inner(schema, &expected_batch),
2245        ] {
2246            let cursor = Bytes::from(buffer);
2247            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2248
2249            let actual_batch = record_batch_reader
2250                .next()
2251                .expect("No batch found")
2252                .expect("Unable to get batch");
2253
2254            assert_eq!(expected_batch.schema(), actual_batch.schema());
2255            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2256            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2257            for i in 0..expected_batch.num_columns() {
2258                let expected_data = expected_batch.column(i).to_data();
2259                let actual_data = actual_batch.column(i).to_data();
2260
2261                assert_eq!(expected_data, actual_data);
2262            }
2263        }
2264    }
2265
2266    #[test]
2267    fn arrow_writer_non_null() {
2268        // define schema
2269        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2270
2271        // create some data
2272        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2273
2274        // build a record batch
2275        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2276
2277        roundtrip(batch, Some(SMALL_SIZE / 2));
2278    }
2279
2280    #[test]
2281    fn arrow_writer_list() {
2282        // define schema
2283        let schema = Schema::new(vec![Field::new(
2284            "a",
2285            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2286            true,
2287        )]);
2288
2289        // create some data
2290        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2291
2292        // Construct a buffer for value offsets, for the nested array:
2293        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2294        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2295
2296        // Construct a list array from the above two
2297        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2298            DataType::Int32,
2299            false,
2300        ))))
2301        .len(5)
2302        .add_buffer(a_value_offsets)
2303        .add_child_data(a_values.into_data())
2304        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2305        .build()
2306        .unwrap();
2307        let a = ListArray::from(a_list_data);
2308
2309        // build a record batch
2310        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2311
2312        assert_eq!(batch.column(0).null_count(), 1);
2313
2314        // This test fails if the max row group size is less than the batch's length
2315        // see https://github.com/apache/arrow-rs/issues/518
2316        roundtrip(batch, None);
2317    }
2318
2319    #[test]
2320    fn arrow_writer_list_non_null() {
2321        // define schema
2322        let schema = Schema::new(vec![Field::new(
2323            "a",
2324            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2325            false,
2326        )]);
2327
2328        // create some data
2329        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2330
2331        // Construct a buffer for value offsets, for the nested array:
2332        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2333        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2334
2335        // Construct a list array from the above two
2336        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2337            DataType::Int32,
2338            false,
2339        ))))
2340        .len(5)
2341        .add_buffer(a_value_offsets)
2342        .add_child_data(a_values.into_data())
2343        .build()
2344        .unwrap();
2345        let a = ListArray::from(a_list_data);
2346
2347        // build a record batch
2348        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2349
2350        // This test fails if the max row group size is less than the batch's length
2351        // see https://github.com/apache/arrow-rs/issues/518
2352        assert_eq!(batch.column(0).null_count(), 0);
2353
2354        roundtrip(batch, None);
2355    }
2356
2357    #[test]
2358    fn arrow_writer_list_view() {
2359        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2360        let schema = Schema::new(vec![Field::new(
2361            "a",
2362            DataType::ListView(list_field.clone()),
2363            true,
2364        )]);
2365
2366        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2367        let a = ListViewArray::new(
2368            list_field,
2369            vec![0, 1, 0, 3, 6].into(),
2370            vec![1, 2, 0, 3, 4].into(),
2371            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2372            Some(vec![true, true, false, true, true].into()),
2373        );
2374
2375        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2376
2377        assert_eq!(batch.column(0).null_count(), 1);
2378
2379        roundtrip(batch, None);
2380    }
2381
2382    #[test]
2383    fn arrow_writer_list_view_non_null() {
2384        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2385        let schema = Schema::new(vec![Field::new(
2386            "a",
2387            DataType::ListView(list_field.clone()),
2388            false,
2389        )]);
2390
2391        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2392        let a = ListViewArray::new(
2393            list_field,
2394            vec![0, 1, 0, 3, 6].into(),
2395            vec![1, 2, 0, 3, 4].into(),
2396            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2397            None,
2398        );
2399
2400        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2401
2402        assert_eq!(batch.column(0).null_count(), 0);
2403
2404        roundtrip(batch, None);
2405    }
2406
2407    #[test]
2408    fn arrow_writer_list_view_out_of_order() {
2409        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2410        let schema = Schema::new(vec![Field::new(
2411            "a",
2412            DataType::ListView(list_field.clone()),
2413            false,
2414        )]);
2415
2416        // [[1], [2, 3], [], [7, 8, 9, 10], [4, 5, 6]] - out of order offsets
2417        let a = ListViewArray::new(
2418            list_field,
2419            vec![0, 1, 0, 6, 3].into(),
2420            vec![1, 2, 0, 4, 3].into(),
2421            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2422            None,
2423        );
2424
2425        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2426
2427        roundtrip(batch, None);
2428    }
2429
2430    #[test]
2431    fn arrow_writer_large_list_view() {
2432        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2433        let schema = Schema::new(vec![Field::new(
2434            "a",
2435            DataType::LargeListView(list_field.clone()),
2436            true,
2437        )]);
2438
2439        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2440        let a = LargeListViewArray::new(
2441            list_field,
2442            vec![0i64, 1, 0, 3, 6].into(),
2443            vec![1i64, 2, 0, 3, 4].into(),
2444            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2445            Some(vec![true, true, false, true, true].into()),
2446        );
2447
2448        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2449
2450        assert_eq!(batch.column(0).null_count(), 1);
2451
2452        roundtrip(batch, None);
2453    }
2454
2455    #[test]
2456    fn arrow_writer_list_view_with_struct() {
2457        // Test ListView containing Struct: ListView<Struct<Int32, Utf8>>
2458        let struct_fields = Fields::from(vec![
2459            Field::new("id", DataType::Int32, false),
2460            Field::new("name", DataType::Utf8, false),
2461        ]);
2462        let struct_type = DataType::Struct(struct_fields.clone());
2463        let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2464
2465        let schema = Schema::new(vec![Field::new(
2466            "a",
2467            DataType::ListView(list_field.clone()),
2468            true,
2469        )]);
2470
2471        // Create struct values
2472        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2473        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2474        let struct_array = StructArray::new(
2475            struct_fields,
2476            vec![Arc::new(id_array), Arc::new(name_array)],
2477            None,
2478        );
2479
2480        // Create ListView: [{1, "a"}, {2, "b"}], null, [{3, "c"}, {4, "d"}, {5, "e"}]
2481        let list_view = ListViewArray::new(
2482            list_field,
2483            vec![0, 2, 2].into(), // offsets
2484            vec![2, 0, 3].into(), // sizes
2485            Arc::new(struct_array),
2486            Some(vec![true, false, true].into()),
2487        );
2488
2489        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2490
2491        roundtrip(batch, None);
2492    }
2493
2494    #[test]
2495    fn arrow_writer_binary() {
2496        let string_field = Field::new("a", DataType::Utf8, false);
2497        let binary_field = Field::new("b", DataType::Binary, false);
2498        let schema = Schema::new(vec![string_field, binary_field]);
2499
2500        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2501        let raw_binary_values = [
2502            b"foo".to_vec(),
2503            b"bar".to_vec(),
2504            b"baz".to_vec(),
2505            b"quux".to_vec(),
2506        ];
2507        let raw_binary_value_refs = raw_binary_values
2508            .iter()
2509            .map(|x| x.as_slice())
2510            .collect::<Vec<_>>();
2511
2512        let string_values = StringArray::from(raw_string_values.clone());
2513        let binary_values = BinaryArray::from(raw_binary_value_refs);
2514        let batch = RecordBatch::try_new(
2515            Arc::new(schema),
2516            vec![Arc::new(string_values), Arc::new(binary_values)],
2517        )
2518        .unwrap();
2519
2520        roundtrip(batch, Some(SMALL_SIZE / 2));
2521    }
2522
2523    #[test]
2524    fn arrow_writer_binary_view() {
2525        let string_field = Field::new("a", DataType::Utf8View, false);
2526        let binary_field = Field::new("b", DataType::BinaryView, false);
2527        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2528        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2529
2530        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2531        let raw_binary_values = vec![
2532            b"foo".to_vec(),
2533            b"bar".to_vec(),
2534            b"large payload over 12 bytes".to_vec(),
2535            b"lulu".to_vec(),
2536        ];
2537        let nullable_string_values =
2538            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2539
2540        let string_view_values = StringViewArray::from(raw_string_values);
2541        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2542        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2543        let batch = RecordBatch::try_new(
2544            Arc::new(schema),
2545            vec![
2546                Arc::new(string_view_values),
2547                Arc::new(binary_view_values),
2548                Arc::new(nullable_string_view_values),
2549            ],
2550        )
2551        .unwrap();
2552
2553        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2554        roundtrip(batch, None);
2555    }
2556
2557    #[test]
2558    fn arrow_writer_binary_view_long_value() {
2559        let string_field = Field::new("a", DataType::Utf8View, false);
2560        let binary_field = Field::new("b", DataType::BinaryView, false);
2561        let schema = Schema::new(vec![string_field, binary_field]);
2562
2563        // There is special case validation for long values (greater than 128)
2564        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
2565        // trigger the long-string UTF-8 validation branch in the plain decoder.
2566        let long = "a".repeat(128);
2567        let raw_string_values = vec!["foo", long.as_str(), "bar"];
2568        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2569
2570        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2571        let binary_view_values: ArrayRef =
2572            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2573
2574        one_column_roundtrip(Arc::clone(&string_view_values), false);
2575        one_column_roundtrip(Arc::clone(&binary_view_values), false);
2576
2577        let batch = RecordBatch::try_new(
2578            Arc::new(schema),
2579            vec![string_view_values, binary_view_values],
2580        )
2581        .unwrap();
2582
2583        // Disable dictionary to exercise plain encoding paths in the reader.
2584        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2585            let props = WriterProperties::builder()
2586                .set_writer_version(version)
2587                .set_dictionary_enabled(false)
2588                .build();
2589            roundtrip_opts(&batch, props);
2590        }
2591    }
2592
2593    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2594        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2595        let schema = Schema::new(vec![decimal_field]);
2596
2597        let decimal_values = vec![10_000, 50_000, 0, -100]
2598            .into_iter()
2599            .map(Some)
2600            .collect::<Decimal128Array>()
2601            .with_precision_and_scale(precision, scale)
2602            .unwrap();
2603
2604        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2605    }
2606
2607    #[test]
2608    fn arrow_writer_decimal() {
2609        // int32 to store the decimal value
2610        let batch_int32_decimal = get_decimal_batch(5, 2);
2611        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2612        // int64 to store the decimal value
2613        let batch_int64_decimal = get_decimal_batch(12, 2);
2614        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2615        // fixed_length_byte_array to store the decimal value
2616        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2617        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2618    }
2619
2620    #[test]
2621    fn arrow_writer_complex() {
2622        // define schema
2623        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2624        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2625        let struct_field_g = Arc::new(Field::new_list(
2626            "g",
2627            Field::new_list_field(DataType::Int16, true),
2628            false,
2629        ));
2630        let struct_field_h = Arc::new(Field::new_list(
2631            "h",
2632            Field::new_list_field(DataType::Int16, false),
2633            true,
2634        ));
2635        let struct_field_e = Arc::new(Field::new_struct(
2636            "e",
2637            vec![
2638                struct_field_f.clone(),
2639                struct_field_g.clone(),
2640                struct_field_h.clone(),
2641            ],
2642            false,
2643        ));
2644        let schema = Schema::new(vec![
2645            Field::new("a", DataType::Int32, false),
2646            Field::new("b", DataType::Int32, true),
2647            Field::new_struct(
2648                "c",
2649                vec![struct_field_d.clone(), struct_field_e.clone()],
2650                false,
2651            ),
2652        ]);
2653
2654        // create some data
2655        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2656        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2657        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2658        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2659
2660        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2661
2662        // Construct a buffer for value offsets, for the nested array:
2663        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2664        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2665
2666        // Construct a list array from the above two
2667        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2668            .len(5)
2669            .add_buffer(g_value_offsets.clone())
2670            .add_child_data(g_value.to_data())
2671            .build()
2672            .unwrap();
2673        let g = ListArray::from(g_list_data);
2674        // The difference between g and h is that h has a null bitmap
2675        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2676            .len(5)
2677            .add_buffer(g_value_offsets)
2678            .add_child_data(g_value.to_data())
2679            .null_bit_buffer(Some(Buffer::from([0b00011011])))
2680            .build()
2681            .unwrap();
2682        let h = ListArray::from(h_list_data);
2683
2684        let e = StructArray::from(vec![
2685            (struct_field_f, Arc::new(f) as ArrayRef),
2686            (struct_field_g, Arc::new(g) as ArrayRef),
2687            (struct_field_h, Arc::new(h) as ArrayRef),
2688        ]);
2689
2690        let c = StructArray::from(vec![
2691            (struct_field_d, Arc::new(d) as ArrayRef),
2692            (struct_field_e, Arc::new(e) as ArrayRef),
2693        ]);
2694
2695        // build a record batch
2696        let batch = RecordBatch::try_new(
2697            Arc::new(schema),
2698            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2699        )
2700        .unwrap();
2701
2702        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2703        roundtrip(batch, Some(SMALL_SIZE / 3));
2704    }
2705
2706    #[test]
2707    fn arrow_writer_complex_mixed() {
2708        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
2709        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
2710
2711        // define schema
2712        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2713        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2714        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2715        let schema = Schema::new(vec![Field::new(
2716            "some_nested_object",
2717            DataType::Struct(Fields::from(vec![
2718                offset_field.clone(),
2719                partition_field.clone(),
2720                topic_field.clone(),
2721            ])),
2722            false,
2723        )]);
2724
2725        // create some data
2726        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2727        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2728        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2729
2730        let some_nested_object = StructArray::from(vec![
2731            (offset_field, Arc::new(offset) as ArrayRef),
2732            (partition_field, Arc::new(partition) as ArrayRef),
2733            (topic_field, Arc::new(topic) as ArrayRef),
2734        ]);
2735
2736        // build a record batch
2737        let batch =
2738            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2739
2740        roundtrip(batch, Some(SMALL_SIZE / 2));
2741    }
2742
2743    #[test]
2744    fn arrow_writer_map() {
2745        // Note: we are using the JSON Arrow reader for brevity
2746        let json_content = r#"
2747        {"stocks":{"long": "$AAA", "short": "$BBB"}}
2748        {"stocks":{"long": null, "long": "$CCC", "short": null}}
2749        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2750        "#;
2751        let entries_struct_type = DataType::Struct(Fields::from(vec![
2752            Field::new("key", DataType::Utf8, false),
2753            Field::new("value", DataType::Utf8, true),
2754        ]));
2755        let stocks_field = Field::new(
2756            "stocks",
2757            DataType::Map(
2758                Arc::new(Field::new("entries", entries_struct_type, false)),
2759                false,
2760            ),
2761            true,
2762        );
2763        let schema = Arc::new(Schema::new(vec![stocks_field]));
2764        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2765        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2766
2767        let batch = reader.next().unwrap().unwrap();
2768        roundtrip(batch, None);
2769    }
2770
2771    #[test]
2772    fn arrow_writer_2_level_struct() {
2773        // tests writing <struct<struct<primitive>>
2774        let field_c = Field::new("c", DataType::Int32, true);
2775        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2776        let type_a = DataType::Struct(vec![field_b.clone()].into());
2777        let field_a = Field::new("a", type_a, true);
2778        let schema = Schema::new(vec![field_a.clone()]);
2779
2780        // create data
2781        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2782        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2783            .len(6)
2784            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2785            .add_child_data(c.into_data())
2786            .build()
2787            .unwrap();
2788        let b = StructArray::from(b_data);
2789        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2790            .len(6)
2791            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2792            .add_child_data(b.into_data())
2793            .build()
2794            .unwrap();
2795        let a = StructArray::from(a_data);
2796
2797        assert_eq!(a.null_count(), 1);
2798        assert_eq!(a.column(0).null_count(), 2);
2799
2800        // build a racord batch
2801        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2802
2803        roundtrip(batch, Some(SMALL_SIZE / 2));
2804    }
2805
2806    #[test]
2807    fn arrow_writer_2_level_struct_non_null() {
2808        // tests writing <struct<struct<primitive>>
2809        let field_c = Field::new("c", DataType::Int32, false);
2810        let type_b = DataType::Struct(vec![field_c].into());
2811        let field_b = Field::new("b", type_b.clone(), false);
2812        let type_a = DataType::Struct(vec![field_b].into());
2813        let field_a = Field::new("a", type_a.clone(), false);
2814        let schema = Schema::new(vec![field_a]);
2815
2816        // create data
2817        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2818        let b_data = ArrayDataBuilder::new(type_b)
2819            .len(6)
2820            .add_child_data(c.into_data())
2821            .build()
2822            .unwrap();
2823        let b = StructArray::from(b_data);
2824        let a_data = ArrayDataBuilder::new(type_a)
2825            .len(6)
2826            .add_child_data(b.into_data())
2827            .build()
2828            .unwrap();
2829        let a = StructArray::from(a_data);
2830
2831        assert_eq!(a.null_count(), 0);
2832        assert_eq!(a.column(0).null_count(), 0);
2833
2834        // build a racord batch
2835        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2836
2837        roundtrip(batch, Some(SMALL_SIZE / 2));
2838    }
2839
2840    #[test]
2841    fn arrow_writer_2_level_struct_mixed_null() {
2842        // tests writing <struct<struct<primitive>>
2843        let field_c = Field::new("c", DataType::Int32, false);
2844        let type_b = DataType::Struct(vec![field_c].into());
2845        let field_b = Field::new("b", type_b.clone(), true);
2846        let type_a = DataType::Struct(vec![field_b].into());
2847        let field_a = Field::new("a", type_a.clone(), false);
2848        let schema = Schema::new(vec![field_a]);
2849
2850        // create data
2851        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2852        let b_data = ArrayDataBuilder::new(type_b)
2853            .len(6)
2854            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2855            .add_child_data(c.into_data())
2856            .build()
2857            .unwrap();
2858        let b = StructArray::from(b_data);
2859        // a intentionally has no null buffer, to test that this is handled correctly
2860        let a_data = ArrayDataBuilder::new(type_a)
2861            .len(6)
2862            .add_child_data(b.into_data())
2863            .build()
2864            .unwrap();
2865        let a = StructArray::from(a_data);
2866
2867        assert_eq!(a.null_count(), 0);
2868        assert_eq!(a.column(0).null_count(), 2);
2869
2870        // build a racord batch
2871        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2872
2873        roundtrip(batch, Some(SMALL_SIZE / 2));
2874    }
2875
2876    #[test]
2877    fn arrow_writer_2_level_struct_mixed_null_2() {
2878        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2879        let field_c = Field::new("c", DataType::Int32, false);
2880        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2881        let field_e = Field::new(
2882            "e",
2883            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2884            false,
2885        );
2886
2887        let field_b = Field::new(
2888            "b",
2889            DataType::Struct(vec![field_c, field_d, field_e].into()),
2890            false,
2891        );
2892        let type_a = DataType::Struct(vec![field_b.clone()].into());
2893        let field_a = Field::new("a", type_a, true);
2894        let schema = Schema::new(vec![field_a.clone()]);
2895
2896        // create data
2897        let c = Int32Array::from_iter_values(0..6);
2898        let d = FixedSizeBinaryArray::try_from_iter(
2899            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2900        )
2901        .expect("four byte values");
2902        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2903        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2904            .len(6)
2905            .add_child_data(c.into_data())
2906            .add_child_data(d.into_data())
2907            .add_child_data(e.into_data())
2908            .build()
2909            .unwrap();
2910        let b = StructArray::from(b_data);
2911        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2912            .len(6)
2913            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2914            .add_child_data(b.into_data())
2915            .build()
2916            .unwrap();
2917        let a = StructArray::from(a_data);
2918
2919        assert_eq!(a.null_count(), 3);
2920        assert_eq!(a.column(0).null_count(), 0);
2921
2922        // build a record batch
2923        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2924
2925        roundtrip(batch, Some(SMALL_SIZE / 2));
2926    }
2927
2928    #[test]
2929    fn test_fixed_size_binary_in_dict() {
2930        fn test_fixed_size_binary_in_dict_inner<K>()
2931        where
2932            K: ArrowDictionaryKeyType,
2933            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2934            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2935        {
2936            let field = Field::new(
2937                "a",
2938                DataType::Dictionary(
2939                    Box::new(K::DATA_TYPE),
2940                    Box::new(DataType::FixedSizeBinary(4)),
2941                ),
2942                false,
2943            );
2944            let schema = Schema::new(vec![field]);
2945
2946            let keys: Vec<K::Native> = vec![
2947                K::Native::try_from(0u8).unwrap(),
2948                K::Native::try_from(0u8).unwrap(),
2949                K::Native::try_from(1u8).unwrap(),
2950            ];
2951            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2952            let values = FixedSizeBinaryArray::try_from_iter(
2953                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2954            )
2955            .unwrap();
2956
2957            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2958            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2959            roundtrip(batch, None);
2960        }
2961
2962        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2963        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2964        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2965        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2966        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2967        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2968        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2969        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2970    }
2971
2972    #[test]
2973    fn test_empty_dict() {
2974        let struct_fields = Fields::from(vec![Field::new(
2975            "dict",
2976            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2977            false,
2978        )]);
2979
2980        let schema = Schema::new(vec![Field::new_struct(
2981            "struct",
2982            struct_fields.clone(),
2983            true,
2984        )]);
2985        let dictionary = Arc::new(DictionaryArray::new(
2986            Int32Array::new_null(5),
2987            Arc::new(StringArray::new_null(0)),
2988        ));
2989
2990        let s = StructArray::new(
2991            struct_fields,
2992            vec![dictionary],
2993            Some(NullBuffer::new_null(5)),
2994        );
2995
2996        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2997        roundtrip(batch, None);
2998    }
2999    #[test]
3000    fn arrow_writer_page_size() {
3001        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
3002
3003        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
3004
3005        // Generate an array of 10 unique 10 character string
3006        for i in 0..10 {
3007            let value = i
3008                .to_string()
3009                .repeat(10)
3010                .chars()
3011                .take(10)
3012                .collect::<String>();
3013
3014            builder.append_value(value);
3015        }
3016
3017        let array = Arc::new(builder.finish());
3018
3019        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
3020
3021        let file = tempfile::tempfile().unwrap();
3022
3023        // Set everything very low so we fallback to PLAIN encoding after the first row
3024        let props = WriterProperties::builder()
3025            .set_data_page_size_limit(1)
3026            .set_dictionary_page_size_limit(1)
3027            .set_write_batch_size(1)
3028            .build();
3029
3030        let mut writer =
3031            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3032                .expect("Unable to write file");
3033        writer.write(&batch).unwrap();
3034        writer.close().unwrap();
3035
3036        let options = ReadOptionsBuilder::new().with_page_index().build();
3037        let reader =
3038            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
3039
3040        let column = reader.metadata().row_group(0).columns();
3041
3042        assert_eq!(column.len(), 1);
3043
3044        // We should write one row before falling back to PLAIN encoding so there should still be a
3045        // dictionary page.
3046        assert!(
3047            column[0].dictionary_page_offset().is_some(),
3048            "Expected a dictionary page"
3049        );
3050
3051        assert!(reader.metadata().offset_index().is_some());
3052        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
3053
3054        let page_locations = offset_indexes[0].page_locations.clone();
3055
3056        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
3057        // so we expect one dictionary encoded page and then a page per row thereafter.
3058        assert_eq!(
3059            page_locations.len(),
3060            10,
3061            "Expected 10 pages but got {page_locations:#?}"
3062        );
3063    }
3064
3065    #[test]
3066    fn arrow_writer_float_nans() {
3067        let f16_field = Field::new("a", DataType::Float16, false);
3068        let f32_field = Field::new("b", DataType::Float32, false);
3069        let f64_field = Field::new("c", DataType::Float64, false);
3070        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
3071
3072        let f16_values = (0..MEDIUM_SIZE)
3073            .map(|i| {
3074                Some(if i % 2 == 0 {
3075                    f16::NAN
3076                } else {
3077                    f16::from_f32(i as f32)
3078                })
3079            })
3080            .collect::<Float16Array>();
3081
3082        let f32_values = (0..MEDIUM_SIZE)
3083            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
3084            .collect::<Float32Array>();
3085
3086        let f64_values = (0..MEDIUM_SIZE)
3087            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
3088            .collect::<Float64Array>();
3089
3090        let batch = RecordBatch::try_new(
3091            Arc::new(schema),
3092            vec![
3093                Arc::new(f16_values),
3094                Arc::new(f32_values),
3095                Arc::new(f64_values),
3096            ],
3097        )
3098        .unwrap();
3099
3100        roundtrip(batch, None);
3101    }
3102
3103    const SMALL_SIZE: usize = 7;
3104    const MEDIUM_SIZE: usize = 63;
3105
3106    // Write the batch to parquet and read it back out, ensuring
3107    // that what comes out is the same as what was written in
3108    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3109        let mut files = vec![];
3110        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3111            let mut props = WriterProperties::builder().set_writer_version(version);
3112
3113            if let Some(size) = max_row_group_size {
3114                props = props.set_max_row_group_row_count(Some(size))
3115            }
3116
3117            let props = props.build();
3118            files.push(roundtrip_opts(&expected_batch, props))
3119        }
3120        files
3121    }
3122
3123    // Round trip the specified record batch with the specified writer properties,
3124    // to an in-memory file, and validate the arrays using the specified function.
3125    // Returns the in-memory file.
3126    fn roundtrip_opts_with_array_validation<F>(
3127        expected_batch: &RecordBatch,
3128        props: WriterProperties,
3129        validate: F,
3130    ) -> Bytes
3131    where
3132        F: Fn(&ArrayData, &ArrayData),
3133    {
3134        let mut file = vec![];
3135
3136        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3137            .expect("Unable to write file");
3138        writer.write(expected_batch).unwrap();
3139        writer.close().unwrap();
3140
3141        let file = Bytes::from(file);
3142        let mut record_batch_reader =
3143            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3144
3145        let actual_batch = record_batch_reader
3146            .next()
3147            .expect("No batch found")
3148            .expect("Unable to get batch");
3149
3150        assert_eq!(expected_batch.schema(), actual_batch.schema());
3151        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3152        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3153        for i in 0..expected_batch.num_columns() {
3154            let expected_data = expected_batch.column(i).to_data();
3155            let actual_data = actual_batch.column(i).to_data();
3156            validate(&expected_data, &actual_data);
3157        }
3158
3159        file
3160    }
3161
3162    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3163        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3164            a.validate_full().expect("valid expected data");
3165            b.validate_full().expect("valid actual data");
3166            assert_eq!(a, b)
3167        })
3168    }
3169
3170    struct RoundTripOptions {
3171        values: ArrayRef,
3172        schema: SchemaRef,
3173        bloom_filter: bool,
3174        bloom_filter_ndv: Option<u64>,
3175        bloom_filter_position: BloomFilterPosition,
3176    }
3177
3178    impl RoundTripOptions {
3179        fn new(values: ArrayRef, nullable: bool) -> Self {
3180            let data_type = values.data_type().clone();
3181            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3182            Self {
3183                values,
3184                schema: Arc::new(schema),
3185                bloom_filter: false,
3186                bloom_filter_ndv: None,
3187                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3188            }
3189        }
3190    }
3191
3192    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3193        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3194    }
3195
3196    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3197        let mut options = RoundTripOptions::new(values, false);
3198        options.schema = schema;
3199        one_column_roundtrip_with_options(options)
3200    }
3201
3202    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3203        let RoundTripOptions {
3204            values,
3205            schema,
3206            bloom_filter,
3207            bloom_filter_ndv,
3208            bloom_filter_position,
3209        } = options;
3210
3211        let encodings = match values.data_type() {
3212            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3213                vec![
3214                    Encoding::PLAIN,
3215                    Encoding::DELTA_BYTE_ARRAY,
3216                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
3217                ]
3218            }
3219            DataType::Int64
3220            | DataType::Int32
3221            | DataType::Int16
3222            | DataType::Int8
3223            | DataType::UInt64
3224            | DataType::UInt32
3225            | DataType::UInt16
3226            | DataType::UInt8 => vec![
3227                Encoding::PLAIN,
3228                Encoding::DELTA_BINARY_PACKED,
3229                Encoding::BYTE_STREAM_SPLIT,
3230            ],
3231            DataType::Float32 | DataType::Float64 => {
3232                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3233            }
3234            _ => vec![Encoding::PLAIN],
3235        };
3236
3237        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3238
3239        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3240
3241        let mut files = vec![];
3242        for dictionary_size in [0, 1, 1024] {
3243            for encoding in &encodings {
3244                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3245                    for row_group_size in row_group_sizes {
3246                        let mut builder = WriterProperties::builder()
3247                            .set_writer_version(version)
3248                            .set_max_row_group_row_count(Some(row_group_size))
3249                            .set_dictionary_enabled(dictionary_size != 0)
3250                            .set_dictionary_page_size_limit(dictionary_size.max(1))
3251                            .set_encoding(*encoding)
3252                            .set_bloom_filter_enabled(bloom_filter)
3253                            .set_bloom_filter_position(bloom_filter_position);
3254                        if let Some(ndv) = bloom_filter_ndv {
3255                            builder = builder.set_bloom_filter_max_ndv(ndv);
3256                        }
3257                        let props = builder.build();
3258
3259                        files.push(roundtrip_opts(&expected_batch, props))
3260                    }
3261                }
3262            }
3263        }
3264        files
3265    }
3266
3267    fn values_required<A, I>(iter: I) -> Vec<Bytes>
3268    where
3269        A: From<Vec<I::Item>> + Array + 'static,
3270        I: IntoIterator,
3271    {
3272        let raw_values: Vec<_> = iter.into_iter().collect();
3273        let values = Arc::new(A::from(raw_values));
3274        one_column_roundtrip(values, false)
3275    }
3276
3277    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3278    where
3279        A: From<Vec<Option<I::Item>>> + Array + 'static,
3280        I: IntoIterator,
3281    {
3282        let optional_raw_values: Vec<_> = iter
3283            .into_iter()
3284            .enumerate()
3285            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3286            .collect();
3287        let optional_values = Arc::new(A::from(optional_raw_values));
3288        one_column_roundtrip(optional_values, true)
3289    }
3290
3291    fn required_and_optional<A, I>(iter: I)
3292    where
3293        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3294        I: IntoIterator + Clone,
3295    {
3296        values_required::<A, I>(iter.clone());
3297        values_optional::<A, I>(iter);
3298    }
3299
3300    fn check_bloom_filter<T: AsBytes>(
3301        files: Vec<Bytes>,
3302        file_column: String,
3303        positive_values: Vec<T>,
3304        negative_values: Vec<T>,
3305    ) {
3306        files.into_iter().take(1).for_each(|file| {
3307            let file_reader = SerializedFileReader::new_with_options(
3308                file,
3309                ReadOptionsBuilder::new()
3310                    .with_reader_properties(
3311                        ReaderProperties::builder()
3312                            .set_read_bloom_filter(true)
3313                            .build(),
3314                    )
3315                    .build(),
3316            )
3317            .expect("Unable to open file as Parquet");
3318            let metadata = file_reader.metadata();
3319
3320            // Gets bloom filters from all row groups.
3321            let mut bloom_filters: Vec<_> = vec![];
3322            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3323                if let Some((column_index, _)) = row_group
3324                    .columns()
3325                    .iter()
3326                    .enumerate()
3327                    .find(|(_, column)| column.column_path().string() == file_column)
3328                {
3329                    let row_group_reader = file_reader
3330                        .get_row_group(ri)
3331                        .expect("Unable to read row group");
3332                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3333                        bloom_filters.push(sbbf.clone());
3334                    } else {
3335                        panic!("No bloom filter for column named {file_column} found");
3336                    }
3337                } else {
3338                    panic!("No column named {file_column} found");
3339                }
3340            }
3341
3342            positive_values.iter().for_each(|value| {
3343                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3344                assert!(
3345                    found.is_some(),
3346                    "{}",
3347                    format!("Value {:?} should be in bloom filter", value.as_bytes())
3348                );
3349            });
3350
3351            negative_values.iter().for_each(|value| {
3352                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3353                assert!(
3354                    found.is_none(),
3355                    "{}",
3356                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
3357                );
3358            });
3359        });
3360    }
3361
3362    #[test]
3363    fn all_null_primitive_single_column() {
3364        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3365        one_column_roundtrip(values, true);
3366    }
3367    #[test]
3368    fn null_single_column() {
3369        let values = Arc::new(NullArray::new(SMALL_SIZE));
3370        one_column_roundtrip(values, true);
3371        // null arrays are always nullable, a test with non-nullable nulls fails
3372    }
3373
3374    #[test]
3375    fn bool_single_column() {
3376        required_and_optional::<BooleanArray, _>(
3377            [true, false].iter().cycle().copied().take(SMALL_SIZE),
3378        );
3379    }
3380
3381    #[test]
3382    fn bool_large_single_column() {
3383        let values = Arc::new(
3384            [None, Some(true), Some(false)]
3385                .iter()
3386                .cycle()
3387                .copied()
3388                .take(200_000)
3389                .collect::<BooleanArray>(),
3390        );
3391        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3392        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3393        let file = tempfile::tempfile().unwrap();
3394
3395        let mut writer =
3396            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3397                .expect("Unable to write file");
3398        writer.write(&expected_batch).unwrap();
3399        writer.close().unwrap();
3400    }
3401
3402    #[test]
3403    fn check_page_offset_index_with_nan() {
3404        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3405        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3406        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3407
3408        let mut out = Vec::with_capacity(1024);
3409        let mut writer =
3410            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3411        writer.write(&batch).unwrap();
3412        let file_meta_data = writer.close().unwrap();
3413        for row_group in file_meta_data.row_groups() {
3414            for column in row_group.columns() {
3415                assert!(column.offset_index_offset().is_some());
3416                assert!(column.offset_index_length().is_some());
3417                assert!(column.column_index_offset().is_none());
3418                assert!(column.column_index_length().is_none());
3419            }
3420        }
3421    }
3422
3423    #[test]
3424    fn i8_single_column() {
3425        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3426    }
3427
3428    #[test]
3429    fn i16_single_column() {
3430        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3431    }
3432
3433    #[test]
3434    fn i32_single_column() {
3435        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3436    }
3437
3438    #[test]
3439    fn i64_single_column() {
3440        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3441    }
3442
3443    #[test]
3444    fn u8_single_column() {
3445        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3446    }
3447
3448    #[test]
3449    fn u16_single_column() {
3450        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3451    }
3452
3453    #[test]
3454    fn u32_single_column() {
3455        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3456    }
3457
3458    #[test]
3459    fn u64_single_column() {
3460        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3461    }
3462
3463    #[test]
3464    fn f32_single_column() {
3465        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3466    }
3467
3468    #[test]
3469    fn f64_single_column() {
3470        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3471    }
3472
3473    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
3474    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
3475    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
3476
3477    #[test]
3478    fn timestamp_second_single_column() {
3479        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3480        let values = Arc::new(TimestampSecondArray::from(raw_values));
3481
3482        one_column_roundtrip(values, false);
3483    }
3484
3485    #[test]
3486    fn timestamp_millisecond_single_column() {
3487        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3488        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3489
3490        one_column_roundtrip(values, false);
3491    }
3492
3493    #[test]
3494    fn timestamp_microsecond_single_column() {
3495        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3496        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3497
3498        one_column_roundtrip(values, false);
3499    }
3500
3501    #[test]
3502    fn timestamp_nanosecond_single_column() {
3503        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3504        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3505
3506        one_column_roundtrip(values, false);
3507    }
3508
3509    #[test]
3510    fn date32_single_column() {
3511        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3512    }
3513
3514    #[test]
3515    fn date64_single_column() {
3516        // Date64 must be a multiple of 86400000, see ARROW-10925
3517        required_and_optional::<Date64Array, _>(
3518            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3519        );
3520    }
3521
3522    #[test]
3523    fn time32_second_single_column() {
3524        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3525    }
3526
3527    #[test]
3528    fn time32_millisecond_single_column() {
3529        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3530    }
3531
3532    #[test]
3533    fn time64_microsecond_single_column() {
3534        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3535    }
3536
3537    #[test]
3538    fn time64_nanosecond_single_column() {
3539        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3540    }
3541
3542    #[test]
3543    fn duration_second_single_column() {
3544        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3545    }
3546
3547    #[test]
3548    fn duration_millisecond_single_column() {
3549        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3550    }
3551
3552    #[test]
3553    fn duration_microsecond_single_column() {
3554        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3555    }
3556
3557    #[test]
3558    fn duration_nanosecond_single_column() {
3559        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3560    }
3561
3562    #[test]
3563    fn interval_year_month_single_column() {
3564        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3565    }
3566
3567    #[test]
3568    fn interval_day_time_single_column() {
3569        required_and_optional::<IntervalDayTimeArray, _>(vec![
3570            IntervalDayTime::new(0, 1),
3571            IntervalDayTime::new(0, 3),
3572            IntervalDayTime::new(3, -2),
3573            IntervalDayTime::new(-200, 4),
3574        ]);
3575    }
3576
3577    #[test]
3578    #[should_panic(
3579        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3580    )]
3581    fn interval_month_day_nano_single_column() {
3582        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3583            IntervalMonthDayNano::new(0, 1, 5),
3584            IntervalMonthDayNano::new(0, 3, 2),
3585            IntervalMonthDayNano::new(3, -2, -5),
3586            IntervalMonthDayNano::new(-200, 4, -1),
3587        ]);
3588    }
3589
3590    #[test]
3591    fn binary_single_column() {
3592        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3593        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3594        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3595
3596        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3597        values_required::<BinaryArray, _>(many_vecs_iter);
3598    }
3599
3600    #[test]
3601    fn binary_view_single_column() {
3602        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3603        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3604        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3605
3606        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3607        values_required::<BinaryViewArray, _>(many_vecs_iter);
3608    }
3609
3610    #[test]
3611    fn i32_column_bloom_filter_at_end() {
3612        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3613        let mut options = RoundTripOptions::new(array, false);
3614        options.bloom_filter = true;
3615        options.bloom_filter_position = BloomFilterPosition::End;
3616
3617        let files = one_column_roundtrip_with_options(options);
3618        check_bloom_filter(
3619            files,
3620            "col".to_string(),
3621            (0..SMALL_SIZE as i32).collect(),
3622            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3623        );
3624    }
3625
3626    #[test]
3627    fn i32_column_bloom_filter() {
3628        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3629        let mut options = RoundTripOptions::new(array, false);
3630        options.bloom_filter = true;
3631
3632        let files = one_column_roundtrip_with_options(options);
3633        check_bloom_filter(
3634            files,
3635            "col".to_string(),
3636            (0..SMALL_SIZE as i32).collect(),
3637            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3638        );
3639    }
3640
3641    /// Test that bloom filter folding produces correct results even when
3642    /// the configured NDV differs significantly from actual NDV.
3643    /// A large NDV means a larger initial filter that gets folded down;
3644    /// a small NDV means a smaller initial filter.
3645    #[test]
3646    fn i32_column_bloom_filter_fixed_ndv() {
3647        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3648
3649        // NDV much larger than actual distinct values — tests folding a large filter down
3650        let mut options = RoundTripOptions::new(array.clone(), false);
3651        options.bloom_filter = true;
3652        options.bloom_filter_ndv = Some(1_000_000);
3653
3654        let files = one_column_roundtrip_with_options(options);
3655        check_bloom_filter(
3656            files,
3657            "col".to_string(),
3658            (0..SMALL_SIZE as i32).collect(),
3659            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3660        );
3661
3662        // NDV smaller than actual distinct values — tests the underestimate path
3663        let mut options = RoundTripOptions::new(array, false);
3664        options.bloom_filter = true;
3665        options.bloom_filter_ndv = Some(3);
3666
3667        let files = one_column_roundtrip_with_options(options);
3668        check_bloom_filter(
3669            files,
3670            "col".to_string(),
3671            (0..SMALL_SIZE as i32).collect(),
3672            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3673        );
3674    }
3675
3676    #[test]
3677    fn binary_column_bloom_filter() {
3678        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3679        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3680        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3681
3682        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3683        let mut options = RoundTripOptions::new(array, false);
3684        options.bloom_filter = true;
3685
3686        let files = one_column_roundtrip_with_options(options);
3687        check_bloom_filter(
3688            files,
3689            "col".to_string(),
3690            many_vecs,
3691            vec![vec![(SMALL_SIZE + 1) as u8]],
3692        );
3693    }
3694
3695    #[test]
3696    fn empty_string_null_column_bloom_filter() {
3697        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3698        let raw_strs = raw_values.iter().map(|s| s.as_str());
3699
3700        let array = Arc::new(StringArray::from_iter_values(raw_strs));
3701        let mut options = RoundTripOptions::new(array, false);
3702        options.bloom_filter = true;
3703
3704        let files = one_column_roundtrip_with_options(options);
3705
3706        let optional_raw_values: Vec<_> = raw_values
3707            .iter()
3708            .enumerate()
3709            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3710            .collect();
3711        // For null slots, empty string should not be in bloom filter.
3712        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3713    }
3714
3715    #[test]
3716    fn large_binary_single_column() {
3717        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3718        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3719        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3720
3721        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3722        values_required::<LargeBinaryArray, _>(many_vecs_iter);
3723    }
3724
3725    #[test]
3726    fn fixed_size_binary_single_column() {
3727        let mut builder = FixedSizeBinaryBuilder::new(4);
3728        builder.append_value(b"0123").unwrap();
3729        builder.append_null();
3730        builder.append_value(b"8910").unwrap();
3731        builder.append_value(b"1112").unwrap();
3732        let array = Arc::new(builder.finish());
3733
3734        one_column_roundtrip(array, true);
3735    }
3736
3737    #[test]
3738    fn string_single_column() {
3739        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3740        let raw_strs = raw_values.iter().map(|s| s.as_str());
3741
3742        required_and_optional::<StringArray, _>(raw_strs);
3743    }
3744
3745    #[test]
3746    fn large_string_single_column() {
3747        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3748        let raw_strs = raw_values.iter().map(|s| s.as_str());
3749
3750        required_and_optional::<LargeStringArray, _>(raw_strs);
3751    }
3752
3753    #[test]
3754    fn string_view_single_column() {
3755        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3756        let raw_strs = raw_values.iter().map(|s| s.as_str());
3757
3758        required_and_optional::<StringViewArray, _>(raw_strs);
3759    }
3760
3761    #[test]
3762    fn null_list_single_column() {
3763        let null_field = Field::new_list_field(DataType::Null, true);
3764        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3765
3766        let schema = Schema::new(vec![list_field]);
3767
3768        // Build [[], null, [null, null]]
3769        let a_values = NullArray::new(2);
3770        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3771        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3772            DataType::Null,
3773            true,
3774        ))))
3775        .len(3)
3776        .add_buffer(a_value_offsets)
3777        .null_bit_buffer(Some(Buffer::from([0b00000101])))
3778        .add_child_data(a_values.into_data())
3779        .build()
3780        .unwrap();
3781
3782        let a = ListArray::from(a_list_data);
3783
3784        assert!(a.is_valid(0));
3785        assert!(!a.is_valid(1));
3786        assert!(a.is_valid(2));
3787
3788        assert_eq!(a.value(0).len(), 0);
3789        assert_eq!(a.value(2).len(), 2);
3790        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3791
3792        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3793        roundtrip(batch, None);
3794    }
3795
3796    #[test]
3797    fn list_single_column() {
3798        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3799        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3800        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3801            DataType::Int32,
3802            false,
3803        ))))
3804        .len(5)
3805        .add_buffer(a_value_offsets)
3806        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3807        .add_child_data(a_values.into_data())
3808        .build()
3809        .unwrap();
3810
3811        assert_eq!(a_list_data.null_count(), 1);
3812
3813        let a = ListArray::from(a_list_data);
3814        let values = Arc::new(a);
3815
3816        one_column_roundtrip(values, true);
3817    }
3818
3819    #[test]
3820    fn large_list_single_column() {
3821        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3822        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3823        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3824            "large_item",
3825            DataType::Int32,
3826            true,
3827        ))))
3828        .len(5)
3829        .add_buffer(a_value_offsets)
3830        .add_child_data(a_values.into_data())
3831        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3832        .build()
3833        .unwrap();
3834
3835        // I think this setup is incorrect because this should pass
3836        assert_eq!(a_list_data.null_count(), 1);
3837
3838        let a = LargeListArray::from(a_list_data);
3839        let values = Arc::new(a);
3840
3841        one_column_roundtrip(values, true);
3842    }
3843
3844    #[test]
3845    fn list_nested_nulls() {
3846        use arrow::datatypes::Int32Type;
3847        let data = vec![
3848            Some(vec![Some(1)]),
3849            Some(vec![Some(2), Some(3)]),
3850            None,
3851            Some(vec![Some(4), Some(5), None]),
3852            Some(vec![None]),
3853            Some(vec![Some(6), Some(7)]),
3854        ];
3855
3856        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3857        one_column_roundtrip(Arc::new(list), true);
3858
3859        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3860        one_column_roundtrip(Arc::new(list), true);
3861    }
3862
3863    #[test]
3864    fn struct_single_column() {
3865        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3866        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3867        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3868
3869        let values = Arc::new(s);
3870        one_column_roundtrip(values, false);
3871    }
3872
3873    #[test]
3874    fn list_and_map_coerced_names() {
3875        // Create map and list with non-Parquet naming
3876        let list_field =
3877            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3878        let map_field = Field::new_map(
3879            "my_map",
3880            "entries",
3881            Field::new("keys", DataType::Int32, false),
3882            Field::new("values", DataType::Int32, true),
3883            false,
3884            true,
3885        );
3886
3887        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3888        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3889
3890        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3891
3892        // Write data to Parquet but coerce names to match spec
3893        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3894        let file = tempfile::tempfile().unwrap();
3895        let mut writer =
3896            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3897
3898        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3899        writer.write(&batch).unwrap();
3900        let file_metadata = writer.close().unwrap();
3901
3902        let schema = file_metadata.file_metadata().schema();
3903        // Coerced name of "item" should be "element"
3904        let list_field = &schema.get_fields()[0].get_fields()[0];
3905        assert_eq!(list_field.get_fields()[0].name(), "element");
3906
3907        let map_field = &schema.get_fields()[1].get_fields()[0];
3908        // Coerced name of "entries" should be "key_value"
3909        assert_eq!(map_field.name(), "key_value");
3910        // Coerced name of "keys" should be "key"
3911        assert_eq!(map_field.get_fields()[0].name(), "key");
3912        // Coerced name of "values" should be "value"
3913        assert_eq!(map_field.get_fields()[1].name(), "value");
3914
3915        // Double check schema after reading from the file
3916        let reader = SerializedFileReader::new(file).unwrap();
3917        let file_schema = reader.metadata().file_metadata().schema();
3918        let fields = file_schema.get_fields();
3919        let list_field = &fields[0].get_fields()[0];
3920        assert_eq!(list_field.get_fields()[0].name(), "element");
3921        let map_field = &fields[1].get_fields()[0];
3922        assert_eq!(map_field.name(), "key_value");
3923        assert_eq!(map_field.get_fields()[0].name(), "key");
3924        assert_eq!(map_field.get_fields()[1].name(), "value");
3925    }
3926
3927    #[test]
3928    fn fallback_flush_data_page() {
3929        //tests if the Fallback::flush_data_page clears all buffers correctly
3930        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3931        let values = Arc::new(StringArray::from(raw_values));
3932        let encodings = vec![
3933            Encoding::DELTA_BYTE_ARRAY,
3934            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3935        ];
3936        let data_type = values.data_type().clone();
3937        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3938        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3939
3940        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3941        let data_page_size_limit: usize = 32;
3942        let write_batch_size: usize = 16;
3943
3944        for encoding in &encodings {
3945            for row_group_size in row_group_sizes {
3946                let props = WriterProperties::builder()
3947                    .set_writer_version(WriterVersion::PARQUET_2_0)
3948                    .set_max_row_group_row_count(Some(row_group_size))
3949                    .set_dictionary_enabled(false)
3950                    .set_encoding(*encoding)
3951                    .set_data_page_size_limit(data_page_size_limit)
3952                    .set_write_batch_size(write_batch_size)
3953                    .build();
3954
3955                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3956                    let string_array_a = StringArray::from(a.clone());
3957                    let string_array_b = StringArray::from(b.clone());
3958                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3959                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3960                    assert_eq!(
3961                        vec_a, vec_b,
3962                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3963                    );
3964                });
3965            }
3966        }
3967    }
3968
3969    #[test]
3970    fn arrow_writer_string_dictionary() {
3971        // define schema
3972        #[allow(deprecated)]
3973        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3974            "dictionary",
3975            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3976            true,
3977            42,
3978            true,
3979        )]));
3980
3981        // create some data
3982        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3983            .iter()
3984            .copied()
3985            .collect();
3986
3987        // build a record batch
3988        one_column_roundtrip_with_schema(Arc::new(d), schema);
3989    }
3990
3991    #[test]
3992    fn arrow_writer_test_type_compatibility() {
3993        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3994        where
3995            T1: Array + 'static,
3996            T2: Array + 'static,
3997        {
3998            let schema1 = Arc::new(Schema::new(vec![Field::new(
3999                "a",
4000                array1.data_type().clone(),
4001                false,
4002            )]));
4003
4004            let file = tempfile().unwrap();
4005            let mut writer =
4006                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
4007
4008            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
4009            writer.write(&rb1).unwrap();
4010
4011            let schema2 = Arc::new(Schema::new(vec![Field::new(
4012                "a",
4013                array2.data_type().clone(),
4014                false,
4015            )]));
4016            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
4017            writer.write(&rb2).unwrap();
4018
4019            writer.close().unwrap();
4020
4021            let mut record_batch_reader =
4022                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
4023            let actual_batch = record_batch_reader.next().unwrap().unwrap();
4024
4025            let expected_batch =
4026                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
4027            assert_eq!(actual_batch, expected_batch);
4028        }
4029
4030        // check compatibility between native and dictionaries
4031
4032        ensure_compatible_write(
4033            DictionaryArray::new(
4034                UInt8Array::from_iter_values(vec![0]),
4035                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4036            ),
4037            StringArray::from_iter_values(vec!["barquet"]),
4038            DictionaryArray::new(
4039                UInt8Array::from_iter_values(vec![0, 1]),
4040                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4041            ),
4042        );
4043
4044        ensure_compatible_write(
4045            StringArray::from_iter_values(vec!["parquet"]),
4046            DictionaryArray::new(
4047                UInt8Array::from_iter_values(vec![0]),
4048                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4049            ),
4050            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4051        );
4052
4053        // check compatibility between dictionaries with different key types
4054
4055        ensure_compatible_write(
4056            DictionaryArray::new(
4057                UInt8Array::from_iter_values(vec![0]),
4058                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4059            ),
4060            DictionaryArray::new(
4061                UInt16Array::from_iter_values(vec![0]),
4062                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4063            ),
4064            DictionaryArray::new(
4065                UInt8Array::from_iter_values(vec![0, 1]),
4066                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4067            ),
4068        );
4069
4070        // check compatibility between dictionaries with different value types
4071        ensure_compatible_write(
4072            DictionaryArray::new(
4073                UInt8Array::from_iter_values(vec![0]),
4074                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4075            ),
4076            DictionaryArray::new(
4077                UInt8Array::from_iter_values(vec![0]),
4078                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
4079            ),
4080            DictionaryArray::new(
4081                UInt8Array::from_iter_values(vec![0, 1]),
4082                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4083            ),
4084        );
4085
4086        // check compatibility between a dictionary and a native array with a different type
4087        ensure_compatible_write(
4088            DictionaryArray::new(
4089                UInt8Array::from_iter_values(vec![0]),
4090                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4091            ),
4092            LargeStringArray::from_iter_values(vec!["barquet"]),
4093            DictionaryArray::new(
4094                UInt8Array::from_iter_values(vec![0, 1]),
4095                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4096            ),
4097        );
4098
4099        // check compatibility for string types
4100
4101        ensure_compatible_write(
4102            StringArray::from_iter_values(vec!["parquet"]),
4103            LargeStringArray::from_iter_values(vec!["barquet"]),
4104            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4105        );
4106
4107        ensure_compatible_write(
4108            LargeStringArray::from_iter_values(vec!["parquet"]),
4109            StringArray::from_iter_values(vec!["barquet"]),
4110            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4111        );
4112
4113        ensure_compatible_write(
4114            StringArray::from_iter_values(vec!["parquet"]),
4115            StringViewArray::from_iter_values(vec!["barquet"]),
4116            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4117        );
4118
4119        ensure_compatible_write(
4120            StringViewArray::from_iter_values(vec!["parquet"]),
4121            StringArray::from_iter_values(vec!["barquet"]),
4122            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4123        );
4124
4125        ensure_compatible_write(
4126            LargeStringArray::from_iter_values(vec!["parquet"]),
4127            StringViewArray::from_iter_values(vec!["barquet"]),
4128            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4129        );
4130
4131        ensure_compatible_write(
4132            StringViewArray::from_iter_values(vec!["parquet"]),
4133            LargeStringArray::from_iter_values(vec!["barquet"]),
4134            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4135        );
4136
4137        // check compatibility for binary types
4138
4139        ensure_compatible_write(
4140            BinaryArray::from_iter_values(vec![b"parquet"]),
4141            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4142            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4143        );
4144
4145        ensure_compatible_write(
4146            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4147            BinaryArray::from_iter_values(vec![b"barquet"]),
4148            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4149        );
4150
4151        ensure_compatible_write(
4152            BinaryArray::from_iter_values(vec![b"parquet"]),
4153            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4154            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4155        );
4156
4157        ensure_compatible_write(
4158            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4159            BinaryArray::from_iter_values(vec![b"barquet"]),
4160            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4161        );
4162
4163        ensure_compatible_write(
4164            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4165            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4166            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4167        );
4168
4169        ensure_compatible_write(
4170            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4171            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4172            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4173        );
4174
4175        // check compatibility for list types
4176
4177        let list_field_metadata = HashMap::from_iter(vec![(
4178            PARQUET_FIELD_ID_META_KEY.to_string(),
4179            "1".to_string(),
4180        )]);
4181        let list_field = Field::new_list_field(DataType::Int32, false);
4182
4183        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4184        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4185
4186        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4187        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4188
4189        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4190        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4191
4192        ensure_compatible_write(
4193            // when the initial schema has the metadata ...
4194            ListArray::try_new(
4195                Arc::new(
4196                    list_field
4197                        .clone()
4198                        .with_metadata(list_field_metadata.clone()),
4199                ),
4200                offsets1,
4201                values1,
4202                None,
4203            )
4204            .unwrap(),
4205            // ... and some intermediate schema doesn't have the metadata
4206            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4207            // ... the write will still go through, and the resulting schema will inherit the initial metadata
4208            ListArray::try_new(
4209                Arc::new(
4210                    list_field
4211                        .clone()
4212                        .with_metadata(list_field_metadata.clone()),
4213                ),
4214                offsets_expected,
4215                values_expected,
4216                None,
4217            )
4218            .unwrap(),
4219        );
4220    }
4221
4222    #[test]
4223    fn arrow_writer_primitive_dictionary() {
4224        // define schema
4225        #[allow(deprecated)]
4226        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4227            "dictionary",
4228            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4229            true,
4230            42,
4231            true,
4232        )]));
4233
4234        // create some data
4235        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4236        builder.append(12345678).unwrap();
4237        builder.append_null();
4238        builder.append(22345678).unwrap();
4239        builder.append(12345678).unwrap();
4240        let d = builder.finish();
4241
4242        one_column_roundtrip_with_schema(Arc::new(d), schema);
4243    }
4244
4245    #[test]
4246    fn arrow_writer_decimal32_dictionary() {
4247        let integers = vec![12345, 56789, 34567];
4248
4249        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4250
4251        let values = Decimal32Array::from(integers.clone())
4252            .with_precision_and_scale(5, 2)
4253            .unwrap();
4254
4255        let array = DictionaryArray::new(keys, Arc::new(values));
4256        one_column_roundtrip(Arc::new(array.clone()), true);
4257
4258        let values = Decimal32Array::from(integers)
4259            .with_precision_and_scale(9, 2)
4260            .unwrap();
4261
4262        let array = array.with_values(Arc::new(values));
4263        one_column_roundtrip(Arc::new(array), true);
4264    }
4265
4266    #[test]
4267    fn arrow_writer_decimal64_dictionary() {
4268        let integers = vec![12345, 56789, 34567];
4269
4270        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4271
4272        let values = Decimal64Array::from(integers.clone())
4273            .with_precision_and_scale(5, 2)
4274            .unwrap();
4275
4276        let array = DictionaryArray::new(keys, Arc::new(values));
4277        one_column_roundtrip(Arc::new(array.clone()), true);
4278
4279        let values = Decimal64Array::from(integers)
4280            .with_precision_and_scale(12, 2)
4281            .unwrap();
4282
4283        let array = array.with_values(Arc::new(values));
4284        one_column_roundtrip(Arc::new(array), true);
4285    }
4286
4287    #[test]
4288    fn arrow_writer_decimal128_dictionary() {
4289        let integers = vec![12345, 56789, 34567];
4290
4291        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4292
4293        let values = Decimal128Array::from(integers.clone())
4294            .with_precision_and_scale(5, 2)
4295            .unwrap();
4296
4297        let array = DictionaryArray::new(keys, Arc::new(values));
4298        one_column_roundtrip(Arc::new(array.clone()), true);
4299
4300        let values = Decimal128Array::from(integers)
4301            .with_precision_and_scale(12, 2)
4302            .unwrap();
4303
4304        let array = array.with_values(Arc::new(values));
4305        one_column_roundtrip(Arc::new(array), true);
4306    }
4307
4308    #[test]
4309    fn arrow_writer_decimal256_dictionary() {
4310        let integers = vec![
4311            i256::from_i128(12345),
4312            i256::from_i128(56789),
4313            i256::from_i128(34567),
4314        ];
4315
4316        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4317
4318        let values = Decimal256Array::from(integers.clone())
4319            .with_precision_and_scale(5, 2)
4320            .unwrap();
4321
4322        let array = DictionaryArray::new(keys, Arc::new(values));
4323        one_column_roundtrip(Arc::new(array.clone()), true);
4324
4325        let values = Decimal256Array::from(integers)
4326            .with_precision_and_scale(12, 2)
4327            .unwrap();
4328
4329        let array = array.with_values(Arc::new(values));
4330        one_column_roundtrip(Arc::new(array), true);
4331    }
4332
4333    #[test]
4334    fn arrow_writer_string_dictionary_unsigned_index() {
4335        // define schema
4336        #[allow(deprecated)]
4337        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4338            "dictionary",
4339            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4340            true,
4341            42,
4342            true,
4343        )]));
4344
4345        // create some data
4346        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4347            .iter()
4348            .copied()
4349            .collect();
4350
4351        one_column_roundtrip_with_schema(Arc::new(d), schema);
4352    }
4353
4354    #[test]
4355    fn u32_min_max() {
4356        // check values roundtrip through parquet
4357        let src = [
4358            u32::MIN,
4359            u32::MIN + 1,
4360            (i32::MAX as u32) - 1,
4361            i32::MAX as u32,
4362            (i32::MAX as u32) + 1,
4363            u32::MAX - 1,
4364            u32::MAX,
4365        ];
4366        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4367        let files = one_column_roundtrip(values, false);
4368
4369        for file in files {
4370            // check statistics are valid
4371            let reader = SerializedFileReader::new(file).unwrap();
4372            let metadata = reader.metadata();
4373
4374            let mut row_offset = 0;
4375            for row_group in metadata.row_groups() {
4376                assert_eq!(row_group.num_columns(), 1);
4377                let column = row_group.column(0);
4378
4379                let num_values = column.num_values() as usize;
4380                let src_slice = &src[row_offset..row_offset + num_values];
4381                row_offset += column.num_values() as usize;
4382
4383                let stats = column.statistics().unwrap();
4384                if let Statistics::Int32(stats) = stats {
4385                    assert_eq!(
4386                        *stats.min_opt().unwrap() as u32,
4387                        *src_slice.iter().min().unwrap()
4388                    );
4389                    assert_eq!(
4390                        *stats.max_opt().unwrap() as u32,
4391                        *src_slice.iter().max().unwrap()
4392                    );
4393                } else {
4394                    panic!("Statistics::Int32 missing")
4395                }
4396            }
4397        }
4398    }
4399
4400    #[test]
4401    fn u64_min_max() {
4402        // check values roundtrip through parquet
4403        let src = [
4404            u64::MIN,
4405            u64::MIN + 1,
4406            (i64::MAX as u64) - 1,
4407            i64::MAX as u64,
4408            (i64::MAX as u64) + 1,
4409            u64::MAX - 1,
4410            u64::MAX,
4411        ];
4412        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4413        let files = one_column_roundtrip(values, false);
4414
4415        for file in files {
4416            // check statistics are valid
4417            let reader = SerializedFileReader::new(file).unwrap();
4418            let metadata = reader.metadata();
4419
4420            let mut row_offset = 0;
4421            for row_group in metadata.row_groups() {
4422                assert_eq!(row_group.num_columns(), 1);
4423                let column = row_group.column(0);
4424
4425                let num_values = column.num_values() as usize;
4426                let src_slice = &src[row_offset..row_offset + num_values];
4427                row_offset += column.num_values() as usize;
4428
4429                let stats = column.statistics().unwrap();
4430                if let Statistics::Int64(stats) = stats {
4431                    assert_eq!(
4432                        *stats.min_opt().unwrap() as u64,
4433                        *src_slice.iter().min().unwrap()
4434                    );
4435                    assert_eq!(
4436                        *stats.max_opt().unwrap() as u64,
4437                        *src_slice.iter().max().unwrap()
4438                    );
4439                } else {
4440                    panic!("Statistics::Int64 missing")
4441                }
4442            }
4443        }
4444    }
4445
4446    #[test]
4447    fn statistics_null_counts_only_nulls() {
4448        // check that null-count statistics for "only NULL"-columns are correct
4449        let values = Arc::new(UInt64Array::from(vec![None, None]));
4450        let files = one_column_roundtrip(values, true);
4451
4452        for file in files {
4453            // check statistics are valid
4454            let reader = SerializedFileReader::new(file).unwrap();
4455            let metadata = reader.metadata();
4456            assert_eq!(metadata.num_row_groups(), 1);
4457            let row_group = metadata.row_group(0);
4458            assert_eq!(row_group.num_columns(), 1);
4459            let column = row_group.column(0);
4460            let stats = column.statistics().unwrap();
4461            assert_eq!(stats.null_count_opt(), Some(2));
4462        }
4463    }
4464
4465    #[test]
4466    fn test_list_of_struct_roundtrip() {
4467        // define schema
4468        let int_field = Field::new("a", DataType::Int32, true);
4469        let int_field2 = Field::new("b", DataType::Int32, true);
4470
4471        let int_builder = Int32Builder::with_capacity(10);
4472        let int_builder2 = Int32Builder::with_capacity(10);
4473
4474        let struct_builder = StructBuilder::new(
4475            vec![int_field, int_field2],
4476            vec![Box::new(int_builder), Box::new(int_builder2)],
4477        );
4478        let mut list_builder = ListBuilder::new(struct_builder);
4479
4480        // Construct the following array
4481        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
4482
4483        // [{a: 1, b: 2}]
4484        let values = list_builder.values();
4485        values
4486            .field_builder::<Int32Builder>(0)
4487            .unwrap()
4488            .append_value(1);
4489        values
4490            .field_builder::<Int32Builder>(1)
4491            .unwrap()
4492            .append_value(2);
4493        values.append(true);
4494        list_builder.append(true);
4495
4496        // []
4497        list_builder.append(true);
4498
4499        // null
4500        list_builder.append(false);
4501
4502        // [null, null]
4503        let values = list_builder.values();
4504        values
4505            .field_builder::<Int32Builder>(0)
4506            .unwrap()
4507            .append_null();
4508        values
4509            .field_builder::<Int32Builder>(1)
4510            .unwrap()
4511            .append_null();
4512        values.append(false);
4513        values
4514            .field_builder::<Int32Builder>(0)
4515            .unwrap()
4516            .append_null();
4517        values
4518            .field_builder::<Int32Builder>(1)
4519            .unwrap()
4520            .append_null();
4521        values.append(false);
4522        list_builder.append(true);
4523
4524        // [{a: null, b: 3}]
4525        let values = list_builder.values();
4526        values
4527            .field_builder::<Int32Builder>(0)
4528            .unwrap()
4529            .append_null();
4530        values
4531            .field_builder::<Int32Builder>(1)
4532            .unwrap()
4533            .append_value(3);
4534        values.append(true);
4535        list_builder.append(true);
4536
4537        // [{a: 2, b: null}]
4538        let values = list_builder.values();
4539        values
4540            .field_builder::<Int32Builder>(0)
4541            .unwrap()
4542            .append_value(2);
4543        values
4544            .field_builder::<Int32Builder>(1)
4545            .unwrap()
4546            .append_null();
4547        values.append(true);
4548        list_builder.append(true);
4549
4550        let array = Arc::new(list_builder.finish());
4551
4552        one_column_roundtrip(array, true);
4553    }
4554
4555    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4556        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4557    }
4558
4559    #[test]
4560    fn test_aggregates_records() {
4561        let arrays = [
4562            Int32Array::from((0..100).collect::<Vec<_>>()),
4563            Int32Array::from((0..50).collect::<Vec<_>>()),
4564            Int32Array::from((200..500).collect::<Vec<_>>()),
4565        ];
4566
4567        let schema = Arc::new(Schema::new(vec![Field::new(
4568            "int",
4569            ArrowDataType::Int32,
4570            false,
4571        )]));
4572
4573        let file = tempfile::tempfile().unwrap();
4574
4575        let props = WriterProperties::builder()
4576            .set_max_row_group_row_count(Some(200))
4577            .build();
4578
4579        let mut writer =
4580            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4581
4582        for array in arrays {
4583            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4584            writer.write(&batch).unwrap();
4585        }
4586
4587        writer.close().unwrap();
4588
4589        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4590        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4591
4592        let batches = builder
4593            .with_batch_size(100)
4594            .build()
4595            .unwrap()
4596            .collect::<ArrowResult<Vec<_>>>()
4597            .unwrap();
4598
4599        assert_eq!(batches.len(), 5);
4600        assert!(batches.iter().all(|x| x.num_columns() == 1));
4601
4602        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4603
4604        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4605
4606        let values: Vec<_> = batches
4607            .iter()
4608            .flat_map(|x| {
4609                x.column(0)
4610                    .as_any()
4611                    .downcast_ref::<Int32Array>()
4612                    .unwrap()
4613                    .values()
4614                    .iter()
4615                    .cloned()
4616            })
4617            .collect();
4618
4619        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4620        assert_eq!(&values, &expected_values)
4621    }
4622
4623    #[test]
4624    fn complex_aggregate() {
4625        // Tests aggregating nested data
4626        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4627        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4628        let struct_a = Arc::new(Field::new(
4629            "struct_a",
4630            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4631            true,
4632        ));
4633
4634        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4635        let struct_b = Arc::new(Field::new(
4636            "struct_b",
4637            DataType::Struct(vec![list_a.clone()].into()),
4638            false,
4639        ));
4640
4641        let schema = Arc::new(Schema::new(vec![struct_b]));
4642
4643        // create nested data
4644        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4645        let field_b_array =
4646            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4647
4648        let struct_a_array = StructArray::from(vec![
4649            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4650            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4651        ]);
4652
4653        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4654            .len(5)
4655            .add_buffer(Buffer::from_iter(vec![
4656                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4657            ]))
4658            .null_bit_buffer(Some(Buffer::from_iter(vec![
4659                true, false, true, false, true,
4660            ])))
4661            .child_data(vec![struct_a_array.into_data()])
4662            .build()
4663            .unwrap();
4664
4665        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4666        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4667
4668        let batch1 =
4669            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4670                .unwrap();
4671
4672        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4673        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4674
4675        let struct_a_array = StructArray::from(vec![
4676            (field_a, Arc::new(field_a_array) as ArrayRef),
4677            (field_b, Arc::new(field_b_array) as ArrayRef),
4678        ]);
4679
4680        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4681            .len(2)
4682            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4683            .child_data(vec![struct_a_array.into_data()])
4684            .build()
4685            .unwrap();
4686
4687        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4688        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4689
4690        let batch2 =
4691            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4692                .unwrap();
4693
4694        let batches = &[batch1, batch2];
4695
4696        // Verify data is as expected
4697
4698        let expected = r#"
4699            +-------------------------------------------------------------------------------------------------------+
4700            | struct_b                                                                                              |
4701            +-------------------------------------------------------------------------------------------------------+
4702            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
4703            | {list: }                                                                                              |
4704            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
4705            | {list: }                                                                                              |
4706            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
4707            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4708            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
4709            +-------------------------------------------------------------------------------------------------------+
4710        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4711
4712        let actual = pretty_format_batches(batches).unwrap().to_string();
4713        assert_eq!(actual, expected);
4714
4715        // Write data
4716        let file = tempfile::tempfile().unwrap();
4717        let props = WriterProperties::builder()
4718            .set_max_row_group_row_count(Some(6))
4719            .build();
4720
4721        let mut writer =
4722            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4723
4724        for batch in batches {
4725            writer.write(batch).unwrap();
4726        }
4727        writer.close().unwrap();
4728
4729        // Read Data
4730        // Should have written entire first batch and first row of second to the first row group
4731        // leaving a single row in the second row group
4732
4733        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4734        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4735
4736        let batches = builder
4737            .with_batch_size(2)
4738            .build()
4739            .unwrap()
4740            .collect::<ArrowResult<Vec<_>>>()
4741            .unwrap();
4742
4743        assert_eq!(batches.len(), 4);
4744        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4745        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4746
4747        let actual = pretty_format_batches(&batches).unwrap().to_string();
4748        assert_eq!(actual, expected);
4749    }
4750
4751    #[test]
4752    fn test_arrow_writer_metadata() {
4753        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4754        let file_schema = batch_schema.clone().with_metadata(
4755            vec![("foo".to_string(), "bar".to_string())]
4756                .into_iter()
4757                .collect(),
4758        );
4759
4760        let batch = RecordBatch::try_new(
4761            Arc::new(batch_schema),
4762            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4763        )
4764        .unwrap();
4765
4766        let mut buf = Vec::with_capacity(1024);
4767        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4768        writer.write(&batch).unwrap();
4769        writer.close().unwrap();
4770    }
4771
4772    #[test]
4773    fn test_arrow_writer_nullable() {
4774        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4775        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4776        let file_schema = Arc::new(file_schema);
4777
4778        let batch = RecordBatch::try_new(
4779            Arc::new(batch_schema),
4780            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4781        )
4782        .unwrap();
4783
4784        let mut buf = Vec::with_capacity(1024);
4785        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4786        writer.write(&batch).unwrap();
4787        writer.close().unwrap();
4788
4789        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4790        let back = read.next().unwrap().unwrap();
4791        assert_eq!(back.schema(), file_schema);
4792        assert_ne!(back.schema(), batch.schema());
4793        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4794    }
4795
4796    #[test]
4797    fn in_progress_accounting() {
4798        // define schema
4799        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4800
4801        // create some data
4802        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4803
4804        // build a record batch
4805        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4806
4807        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4808
4809        // starts empty
4810        assert_eq!(writer.in_progress_size(), 0);
4811        assert_eq!(writer.in_progress_rows(), 0);
4812        assert_eq!(writer.memory_size(), 0);
4813        assert_eq!(writer.bytes_written(), 4); // Initial header
4814        writer.write(&batch).unwrap();
4815
4816        // updated on write
4817        let initial_size = writer.in_progress_size();
4818        assert!(initial_size > 0);
4819        assert_eq!(writer.in_progress_rows(), 5);
4820        let initial_memory = writer.memory_size();
4821        assert!(initial_memory > 0);
4822        // memory estimate is larger than estimated encoded size
4823        assert!(
4824            initial_size <= initial_memory,
4825            "{initial_size} <= {initial_memory}"
4826        );
4827
4828        // updated on second write
4829        writer.write(&batch).unwrap();
4830        assert!(writer.in_progress_size() > initial_size);
4831        assert_eq!(writer.in_progress_rows(), 10);
4832        assert!(writer.memory_size() > initial_memory);
4833        assert!(
4834            writer.in_progress_size() <= writer.memory_size(),
4835            "in_progress_size {} <= memory_size {}",
4836            writer.in_progress_size(),
4837            writer.memory_size()
4838        );
4839
4840        // in progress tracking is cleared, but the overall data written is updated
4841        let pre_flush_bytes_written = writer.bytes_written();
4842        writer.flush().unwrap();
4843        assert_eq!(writer.in_progress_size(), 0);
4844        assert_eq!(writer.memory_size(), 0);
4845        assert!(writer.bytes_written() > pre_flush_bytes_written);
4846
4847        writer.close().unwrap();
4848    }
4849
4850    #[test]
4851    fn test_writer_all_null() {
4852        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4853        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4854        let batch = RecordBatch::try_from_iter(vec![
4855            ("a", Arc::new(a) as ArrayRef),
4856            ("b", Arc::new(b) as ArrayRef),
4857        ])
4858        .unwrap();
4859
4860        let mut buf = Vec::with_capacity(1024);
4861        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4862        writer.write(&batch).unwrap();
4863        writer.close().unwrap();
4864
4865        let bytes = Bytes::from(buf);
4866        let options = ReadOptionsBuilder::new().with_page_index().build();
4867        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4868        let index = reader.metadata().offset_index().unwrap();
4869
4870        assert_eq!(index.len(), 1);
4871        assert_eq!(index[0].len(), 2); // 2 columns
4872        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4873        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4874    }
4875
4876    #[test]
4877    fn test_disabled_statistics_with_page() {
4878        let file_schema = Schema::new(vec![
4879            Field::new("a", DataType::Utf8, true),
4880            Field::new("b", DataType::Utf8, true),
4881        ]);
4882        let file_schema = Arc::new(file_schema);
4883
4884        let batch = RecordBatch::try_new(
4885            file_schema.clone(),
4886            vec![
4887                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4888                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4889            ],
4890        )
4891        .unwrap();
4892
4893        let props = WriterProperties::builder()
4894            .set_statistics_enabled(EnabledStatistics::None)
4895            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4896            .build();
4897
4898        let mut buf = Vec::with_capacity(1024);
4899        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4900        writer.write(&batch).unwrap();
4901
4902        let metadata = writer.close().unwrap();
4903        assert_eq!(metadata.num_row_groups(), 1);
4904        let row_group = metadata.row_group(0);
4905        assert_eq!(row_group.num_columns(), 2);
4906        // Column "a" has both offset and column index, as requested
4907        assert!(row_group.column(0).offset_index_offset().is_some());
4908        assert!(row_group.column(0).column_index_offset().is_some());
4909        // Column "b" should only have offset index
4910        assert!(row_group.column(1).offset_index_offset().is_some());
4911        assert!(row_group.column(1).column_index_offset().is_none());
4912
4913        let options = ReadOptionsBuilder::new().with_page_index().build();
4914        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4915
4916        let row_group = reader.get_row_group(0).unwrap();
4917        let a_col = row_group.metadata().column(0);
4918        let b_col = row_group.metadata().column(1);
4919
4920        // Column chunk of column "a" should have chunk level statistics
4921        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4922            let min = byte_array_stats.min_opt().unwrap();
4923            let max = byte_array_stats.max_opt().unwrap();
4924
4925            assert_eq!(min.as_bytes(), b"a");
4926            assert_eq!(max.as_bytes(), b"d");
4927        } else {
4928            panic!("expecting Statistics::ByteArray");
4929        }
4930
4931        // The column chunk for column "b" shouldn't have statistics
4932        assert!(b_col.statistics().is_none());
4933
4934        let offset_index = reader.metadata().offset_index().unwrap();
4935        assert_eq!(offset_index.len(), 1); // 1 row group
4936        assert_eq!(offset_index[0].len(), 2); // 2 columns
4937
4938        let column_index = reader.metadata().column_index().unwrap();
4939        assert_eq!(column_index.len(), 1); // 1 row group
4940        assert_eq!(column_index[0].len(), 2); // 2 columns
4941
4942        let a_idx = &column_index[0][0];
4943        assert!(
4944            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4945            "{a_idx:?}"
4946        );
4947        let b_idx = &column_index[0][1];
4948        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4949    }
4950
4951    #[test]
4952    fn test_disabled_statistics_with_chunk() {
4953        let file_schema = Schema::new(vec![
4954            Field::new("a", DataType::Utf8, true),
4955            Field::new("b", DataType::Utf8, true),
4956        ]);
4957        let file_schema = Arc::new(file_schema);
4958
4959        let batch = RecordBatch::try_new(
4960            file_schema.clone(),
4961            vec![
4962                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4963                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4964            ],
4965        )
4966        .unwrap();
4967
4968        let props = WriterProperties::builder()
4969            .set_statistics_enabled(EnabledStatistics::None)
4970            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4971            .build();
4972
4973        let mut buf = Vec::with_capacity(1024);
4974        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4975        writer.write(&batch).unwrap();
4976
4977        let metadata = writer.close().unwrap();
4978        assert_eq!(metadata.num_row_groups(), 1);
4979        let row_group = metadata.row_group(0);
4980        assert_eq!(row_group.num_columns(), 2);
4981        // Column "a" should only have offset index
4982        assert!(row_group.column(0).offset_index_offset().is_some());
4983        assert!(row_group.column(0).column_index_offset().is_none());
4984        // Column "b" should only have offset index
4985        assert!(row_group.column(1).offset_index_offset().is_some());
4986        assert!(row_group.column(1).column_index_offset().is_none());
4987
4988        let options = ReadOptionsBuilder::new().with_page_index().build();
4989        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4990
4991        let row_group = reader.get_row_group(0).unwrap();
4992        let a_col = row_group.metadata().column(0);
4993        let b_col = row_group.metadata().column(1);
4994
4995        // Column chunk of column "a" should have chunk level statistics
4996        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4997            let min = byte_array_stats.min_opt().unwrap();
4998            let max = byte_array_stats.max_opt().unwrap();
4999
5000            assert_eq!(min.as_bytes(), b"a");
5001            assert_eq!(max.as_bytes(), b"d");
5002        } else {
5003            panic!("expecting Statistics::ByteArray");
5004        }
5005
5006        // The column chunk for column "b"  shouldn't have statistics
5007        assert!(b_col.statistics().is_none());
5008
5009        let column_index = reader.metadata().column_index().unwrap();
5010        assert_eq!(column_index.len(), 1); // 1 row group
5011        assert_eq!(column_index[0].len(), 2); // 2 columns
5012
5013        let a_idx = &column_index[0][0];
5014        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
5015        let b_idx = &column_index[0][1];
5016        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
5017    }
5018
5019    #[test]
5020    fn test_arrow_writer_skip_metadata() {
5021        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5022        let file_schema = Arc::new(batch_schema.clone());
5023
5024        let batch = RecordBatch::try_new(
5025            Arc::new(batch_schema),
5026            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5027        )
5028        .unwrap();
5029        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
5030
5031        let mut buf = Vec::with_capacity(1024);
5032        let mut writer =
5033            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5034        writer.write(&batch).unwrap();
5035        writer.close().unwrap();
5036
5037        let bytes = Bytes::from(buf);
5038        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5039        assert_eq!(file_schema, *reader_builder.schema());
5040        if let Some(key_value_metadata) = reader_builder
5041            .metadata()
5042            .file_metadata()
5043            .key_value_metadata()
5044        {
5045            assert!(
5046                !key_value_metadata
5047                    .iter()
5048                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
5049            );
5050        }
5051    }
5052
5053    #[test]
5054    fn test_arrow_writer_skip_path_in_schema() {
5055        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5056        let file_schema = Arc::new(batch_schema.clone());
5057
5058        let batch = RecordBatch::try_new(
5059            Arc::new(batch_schema),
5060            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5061        )
5062        .unwrap();
5063
5064        // default options should still write path_in_schema
5065        let skip_options = ArrowWriterOptions::new();
5066
5067        let mut buf = Vec::with_capacity(1024);
5068        let mut writer =
5069            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5070        writer.write(&batch).unwrap();
5071        writer.close().unwrap();
5072
5073        // override to not write path_in_schema
5074        let skip_options = ArrowWriterOptions::new().with_properties(
5075            WriterProperties::builder()
5076                .set_write_path_in_schema(false)
5077                .build(),
5078        );
5079
5080        let mut buf2 = Vec::with_capacity(1024);
5081        let mut writer =
5082            ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
5083                .unwrap();
5084        writer.write(&batch).unwrap();
5085        writer.close().unwrap();
5086
5087        // buf2 should be a bit smaller due to lack of path_in_schema
5088        assert!(buf.len() > buf2.len());
5089    }
5090
5091    #[test]
5092    fn mismatched_schemas() {
5093        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
5094        let file_schema = Arc::new(Schema::new(vec![Field::new(
5095            "temperature",
5096            DataType::Float64,
5097            false,
5098        )]));
5099
5100        let batch = RecordBatch::try_new(
5101            Arc::new(batch_schema),
5102            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5103        )
5104        .unwrap();
5105
5106        let mut buf = Vec::with_capacity(1024);
5107        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5108
5109        let err = writer.write(&batch).unwrap_err().to_string();
5110        assert_eq!(
5111            err,
5112            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5113        );
5114    }
5115
5116    #[test]
5117    // https://github.com/apache/arrow-rs/issues/6988
5118    fn test_roundtrip_empty_schema() {
5119        // create empty record batch with empty schema
5120        let empty_batch = RecordBatch::try_new_with_options(
5121            Arc::new(Schema::empty()),
5122            vec![],
5123            &RecordBatchOptions::default().with_row_count(Some(0)),
5124        )
5125        .unwrap();
5126
5127        // write to parquet
5128        let mut parquet_bytes: Vec<u8> = Vec::new();
5129        let mut writer =
5130            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5131        writer.write(&empty_batch).unwrap();
5132        writer.close().unwrap();
5133
5134        // read from parquet
5135        let bytes = Bytes::from(parquet_bytes);
5136        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5137        assert_eq!(reader.schema(), &empty_batch.schema());
5138        let batches: Vec<_> = reader
5139            .build()
5140            .unwrap()
5141            .collect::<ArrowResult<Vec<_>>>()
5142            .unwrap();
5143        assert_eq!(batches.len(), 0);
5144    }
5145
5146    #[test]
5147    fn test_page_stats_not_written_by_default() {
5148        let string_field = Field::new("a", DataType::Utf8, false);
5149        let schema = Schema::new(vec![string_field]);
5150        let raw_string_values = vec!["Blart Versenwald III"];
5151        let string_values = StringArray::from(raw_string_values.clone());
5152        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5153
5154        let props = WriterProperties::builder()
5155            .set_statistics_enabled(EnabledStatistics::Page)
5156            .set_dictionary_enabled(false)
5157            .set_encoding(Encoding::PLAIN)
5158            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5159            .build();
5160
5161        let file = roundtrip_opts(&batch, props);
5162
5163        // read file and decode page headers
5164        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5165
5166        // decode first page header
5167        let first_page = &file[4..];
5168        let mut prot = ThriftSliceInputProtocol::new(first_page);
5169        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5170        let stats = hdr.data_page_header.unwrap().statistics;
5171
5172        assert!(stats.is_none());
5173    }
5174
5175    #[test]
5176    fn test_page_stats_when_enabled() {
5177        let string_field = Field::new("a", DataType::Utf8, false);
5178        let schema = Schema::new(vec![string_field]);
5179        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5180        let string_values = StringArray::from(raw_string_values.clone());
5181        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5182
5183        let props = WriterProperties::builder()
5184            .set_statistics_enabled(EnabledStatistics::Page)
5185            .set_dictionary_enabled(false)
5186            .set_encoding(Encoding::PLAIN)
5187            .set_write_page_header_statistics(true)
5188            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5189            .build();
5190
5191        let file = roundtrip_opts(&batch, props);
5192
5193        // read file and decode page headers
5194        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5195
5196        // decode first page header
5197        let first_page = &file[4..];
5198        let mut prot = ThriftSliceInputProtocol::new(first_page);
5199        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5200        let stats = hdr.data_page_header.unwrap().statistics;
5201
5202        let stats = stats.unwrap();
5203        // check that min/max were actually written to the page
5204        assert!(stats.is_max_value_exact.unwrap());
5205        assert!(stats.is_min_value_exact.unwrap());
5206        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5207        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5208    }
5209
5210    #[test]
5211    fn test_page_stats_truncation() {
5212        let string_field = Field::new("a", DataType::Utf8, false);
5213        let binary_field = Field::new("b", DataType::Binary, false);
5214        let schema = Schema::new(vec![string_field, binary_field]);
5215
5216        let raw_string_values = vec!["Blart Versenwald III"];
5217        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5218        let raw_binary_value_refs = raw_binary_values
5219            .iter()
5220            .map(|x| x.as_slice())
5221            .collect::<Vec<_>>();
5222
5223        let string_values = StringArray::from(raw_string_values.clone());
5224        let binary_values = BinaryArray::from(raw_binary_value_refs);
5225        let batch = RecordBatch::try_new(
5226            Arc::new(schema),
5227            vec![Arc::new(string_values), Arc::new(binary_values)],
5228        )
5229        .unwrap();
5230
5231        let props = WriterProperties::builder()
5232            .set_statistics_truncate_length(Some(2))
5233            .set_dictionary_enabled(false)
5234            .set_encoding(Encoding::PLAIN)
5235            .set_write_page_header_statistics(true)
5236            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5237            .build();
5238
5239        let file = roundtrip_opts(&batch, props);
5240
5241        // read file and decode page headers
5242        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5243
5244        // decode first page header
5245        let first_page = &file[4..];
5246        let mut prot = ThriftSliceInputProtocol::new(first_page);
5247        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5248        let stats = hdr.data_page_header.unwrap().statistics;
5249        assert!(stats.is_some());
5250        let stats = stats.unwrap();
5251        // check that min/max were properly truncated
5252        assert!(!stats.is_max_value_exact.unwrap());
5253        assert!(!stats.is_min_value_exact.unwrap());
5254        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5255        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5256
5257        // check second page now
5258        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5259        let mut prot = ThriftSliceInputProtocol::new(second_page);
5260        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5261        let stats = hdr.data_page_header.unwrap().statistics;
5262        assert!(stats.is_some());
5263        let stats = stats.unwrap();
5264        // check that min/max were properly truncated
5265        assert!(!stats.is_max_value_exact.unwrap());
5266        assert!(!stats.is_min_value_exact.unwrap());
5267        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5268        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5269    }
5270
5271    #[test]
5272    fn test_page_encoding_statistics_roundtrip() {
5273        let batch_schema = Schema::new(vec![Field::new(
5274            "int32",
5275            arrow_schema::DataType::Int32,
5276            false,
5277        )]);
5278
5279        let batch = RecordBatch::try_new(
5280            Arc::new(batch_schema.clone()),
5281            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5282        )
5283        .unwrap();
5284
5285        let mut file: File = tempfile::tempfile().unwrap();
5286        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5287        writer.write(&batch).unwrap();
5288        let file_metadata = writer.close().unwrap();
5289
5290        assert_eq!(file_metadata.num_row_groups(), 1);
5291        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5292        assert!(
5293            file_metadata
5294                .row_group(0)
5295                .column(0)
5296                .page_encoding_stats()
5297                .is_some()
5298        );
5299        let chunk_page_stats = file_metadata
5300            .row_group(0)
5301            .column(0)
5302            .page_encoding_stats()
5303            .unwrap();
5304
5305        // check that the read metadata is also correct
5306        let options = ReadOptionsBuilder::new()
5307            .with_page_index()
5308            .with_encoding_stats_as_mask(false)
5309            .build();
5310        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5311
5312        let rowgroup = reader.get_row_group(0).expect("row group missing");
5313        assert_eq!(rowgroup.num_columns(), 1);
5314        let column = rowgroup.metadata().column(0);
5315        assert!(column.page_encoding_stats().is_some());
5316        let file_page_stats = column.page_encoding_stats().unwrap();
5317        assert_eq!(chunk_page_stats, file_page_stats);
5318    }
5319
5320    #[test]
5321    fn test_different_dict_page_size_limit() {
5322        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5323        let schema = Arc::new(Schema::new(vec![
5324            Field::new("col0", arrow_schema::DataType::Int64, false),
5325            Field::new("col1", arrow_schema::DataType::Int64, false),
5326        ]));
5327        let batch =
5328            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5329
5330        let props = WriterProperties::builder()
5331            .set_dictionary_page_size_limit(1024 * 1024)
5332            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5333            .build();
5334        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5335        writer.write(&batch).unwrap();
5336        let data = Bytes::from(writer.into_inner().unwrap());
5337
5338        let mut metadata = ParquetMetaDataReader::new();
5339        metadata.try_parse(&data).unwrap();
5340        let metadata = metadata.finish().unwrap();
5341        let col0_meta = metadata.row_group(0).column(0);
5342        let col1_meta = metadata.row_group(0).column(1);
5343
5344        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5345            let mut reader =
5346                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5347            let page = reader.get_next_page().unwrap().unwrap();
5348            match page {
5349                Page::DictionaryPage { buf, .. } => buf.len(),
5350                _ => panic!("expected DictionaryPage"),
5351            }
5352        };
5353
5354        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5355        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5356    }
5357
5358    #[test]
5359    fn test_arrow_writer_granular_mode_roundtrip() {
5360        // Granular mode subdivides chunks and writes more pages than the
5361        // default batched path. Make sure the data we write back is
5362        // bit-identical to what went in — page-count assertions elsewhere
5363        // only prove pages were cut, not that the encoded data is correct.
5364        //
5365        // Mix value sizes so that the cumulative-byte-budget cutoff
5366        // lands mid-chunk, exercising both batched and granular paths
5367        // within the same `write_batch_internal` call.
5368        let small = "tiny".to_string();
5369        let big = "x".repeat(64 * 1024);
5370        let strings: Vec<String> = (0..256)
5371            .map(|i| {
5372                if i % 16 == 0 {
5373                    big.clone()
5374                } else {
5375                    small.clone()
5376                }
5377            })
5378            .collect();
5379
5380        let schema = Arc::new(Schema::new(vec![Field::new(
5381            "col",
5382            ArrowDataType::Utf8,
5383            false,
5384        )]));
5385        let batch = RecordBatch::try_new(
5386            schema.clone(),
5387            vec![Arc::new(StringArray::from(strings.clone())) as _],
5388        )
5389        .unwrap();
5390
5391        let props = WriterProperties::builder()
5392            .set_dictionary_enabled(false)
5393            .set_data_page_size_limit(16 * 1024)
5394            .build();
5395        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5396        writer.write(&batch).unwrap();
5397        let data = Bytes::from(writer.into_inner().unwrap());
5398
5399        let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5400        let read = reader.next().unwrap().unwrap();
5401        assert!(reader.next().is_none(), "expected one batch");
5402        let col = read
5403            .column(0)
5404            .as_any()
5405            .downcast_ref::<StringArray>()
5406            .unwrap();
5407        assert_eq!(col.len(), strings.len());
5408        for (i, expected) in strings.iter().enumerate() {
5409            assert_eq!(
5410                col.value(i),
5411                expected.as_str(),
5412                "value mismatch at index {i}"
5413            );
5414        }
5415    }
5416
5417    #[test]
5418    fn test_arrow_writer_all_null_string_column() {
5419        // The `LevelDataRef::value_count` Uniform branch with
5420        // `value != max_def` (entirely-null chunk) must return 0 so the
5421        // sub-batch sizer short-circuits to batch mode without trying
5422        // to estimate byte budgets for non-existent values.
5423        let num_rows = 1024;
5424        let schema = Arc::new(Schema::new(vec![Field::new(
5425            "col",
5426            ArrowDataType::Utf8,
5427            true,
5428        )]));
5429        let nulls: Vec<Option<&str>> = vec![None; num_rows];
5430        let batch = RecordBatch::try_new(
5431            schema.clone(),
5432            vec![Arc::new(StringArray::from(nulls)) as _],
5433        )
5434        .unwrap();
5435
5436        let props = WriterProperties::builder()
5437            .set_dictionary_enabled(false)
5438            .set_data_page_size_limit(16 * 1024)
5439            .build();
5440        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5441        writer.write(&batch).unwrap();
5442        let data = Bytes::from(writer.into_inner().unwrap());
5443
5444        // Re-parse the file: row group has one column, every row is
5445        // null, all data pages report `num_rows / page_count` rows.
5446        let mut metadata = ParquetMetaDataReader::new();
5447        metadata.try_parse(&data).unwrap();
5448        let metadata = metadata.finish().unwrap();
5449        let row_group = metadata.row_group(0);
5450        let col_meta = row_group.column(0);
5451        assert_eq!(row_group.num_rows() as usize, num_rows);
5452        // Statistics record `null_count = num_rows` — proves every value
5453        // was written as null.
5454        if let Some(stats) = col_meta.statistics() {
5455            assert_eq!(
5456                stats.null_count_opt().unwrap_or(0) as usize,
5457                num_rows,
5458                "expected all-null column to report null_count = num_rows"
5459            );
5460        }
5461
5462        let mut reader =
5463            SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5464        let mut total_values = 0u32;
5465        while let Some(page) = reader.get_next_page().unwrap() {
5466            if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5467                total_values += page.num_values();
5468            }
5469        }
5470        assert_eq!(
5471            total_values as usize, num_rows,
5472            "expected every level position to be represented in some page"
5473        );
5474    }
5475
5476    struct WriteBatchesShape {
5477        num_batches: usize,
5478        rows_per_batch: usize,
5479        row_size: usize,
5480    }
5481
5482    /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter`
5483    fn write_batches(
5484        WriteBatchesShape {
5485            num_batches,
5486            rows_per_batch,
5487            row_size,
5488        }: WriteBatchesShape,
5489        props: WriterProperties,
5490    ) -> ParquetRecordBatchReaderBuilder<File> {
5491        let schema = Arc::new(Schema::new(vec![Field::new(
5492            "str",
5493            ArrowDataType::Utf8,
5494            false,
5495        )]));
5496        let file = tempfile::tempfile().unwrap();
5497        let mut writer =
5498            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5499
5500        for batch_idx in 0..num_batches {
5501            let strings: Vec<String> = (0..rows_per_batch)
5502                .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5503                .collect();
5504            let array = StringArray::from(strings);
5505            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5506            writer.write(&batch).unwrap();
5507        }
5508        writer.close().unwrap();
5509        ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5510    }
5511
5512    #[test]
5513    // When both limits are None, all data should go into a single row group
5514    fn test_row_group_limit_none_writes_single_row_group() {
5515        let props = WriterProperties::builder()
5516            .set_max_row_group_row_count(None)
5517            .set_max_row_group_bytes(None)
5518            .build();
5519
5520        let builder = write_batches(
5521            WriteBatchesShape {
5522                num_batches: 1,
5523                rows_per_batch: 1000,
5524                row_size: 4,
5525            },
5526            props,
5527        );
5528
5529        assert_eq!(
5530            &row_group_sizes(builder.metadata()),
5531            &[1000],
5532            "With no limits, all rows should be in a single row group"
5533        );
5534    }
5535
5536    #[test]
5537    // When only max_row_group_size is set, respect the row limit
5538    fn test_row_group_limit_rows_only() {
5539        let props = WriterProperties::builder()
5540            .set_max_row_group_row_count(Some(300))
5541            .set_max_row_group_bytes(None)
5542            .build();
5543
5544        let builder = write_batches(
5545            WriteBatchesShape {
5546                num_batches: 1,
5547                rows_per_batch: 1000,
5548                row_size: 4,
5549            },
5550            props,
5551        );
5552
5553        assert_eq!(
5554            &row_group_sizes(builder.metadata()),
5555            &[300, 300, 300, 100],
5556            "Row groups should be split by row count"
5557        );
5558    }
5559
5560    #[test]
5561    // When only max_row_group_bytes is set, respect the byte limit
5562    fn test_row_group_limit_bytes_only() {
5563        let props = WriterProperties::builder()
5564            .set_max_row_group_row_count(None)
5565            // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
5566            .set_max_row_group_bytes(Some(3500))
5567            .build();
5568
5569        let builder = write_batches(
5570            WriteBatchesShape {
5571                num_batches: 10,
5572                rows_per_batch: 10,
5573                row_size: 100,
5574            },
5575            props,
5576        );
5577
5578        let sizes = row_group_sizes(builder.metadata());
5579
5580        assert!(
5581            sizes.len() > 1,
5582            "Should have multiple row groups due to byte limit, got {sizes:?}",
5583        );
5584
5585        let total_rows: i64 = sizes.iter().sum();
5586        assert_eq!(total_rows, 100, "Total rows should be preserved");
5587    }
5588
5589    #[test]
5590    // If an in-progress row group is already oversized, it should be flushed before writing more.
5591    fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5592        let schema = Arc::new(Schema::new(vec![Field::new(
5593            "str",
5594            ArrowDataType::Utf8,
5595            false,
5596        )]));
5597        let file = tempfile::tempfile().unwrap();
5598
5599        // Start with no byte limit so we can intentionally build an oversized in-progress row group.
5600        let props = WriterProperties::builder()
5601            .set_max_row_group_row_count(None)
5602            .set_max_row_group_bytes(None)
5603            .build();
5604        let mut writer =
5605            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5606
5607        let first_array = StringArray::from(
5608            (0..10)
5609                .map(|i| format!("{:0>100}", i))
5610                .collect::<Vec<String>>(),
5611        );
5612        let first_batch =
5613            RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5614        writer.write(&first_batch).unwrap();
5615        assert_eq!(writer.in_progress_rows(), 10);
5616
5617        // Tighten the limit below the current in-progress bytes to exercise:
5618        // `if current_bytes >= max_bytes { self.flush()?; ... }`
5619        writer.max_row_group_bytes = Some(1);
5620
5621        let second_array = StringArray::from(vec!["x".to_string()]);
5622        let second_batch =
5623            RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5624        writer.write(&second_batch).unwrap();
5625        writer.close().unwrap();
5626        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5627
5628        assert_eq!(
5629            &row_group_sizes(builder.metadata()),
5630            &[10, 1],
5631            "The second write should flush an oversized in-progress row group first",
5632        );
5633    }
5634
5635    #[test]
5636    // When both limits are set, the row limit triggers first
5637    fn test_row_group_limit_both_row_wins_single_batch() {
5638        let props = WriterProperties::builder()
5639            .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows
5640            .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data
5641            .build();
5642
5643        let builder = write_batches(
5644            WriteBatchesShape {
5645                num_batches: 1,
5646                row_size: 4,
5647                rows_per_batch: 1000,
5648            },
5649            props,
5650        );
5651
5652        assert_eq!(
5653            &row_group_sizes(builder.metadata()),
5654            &[200, 200, 200, 200, 200],
5655            "Row limit should trigger before byte limit"
5656        );
5657    }
5658
5659    #[test]
5660    // When both limits are set, the row limit triggers first
5661    fn test_row_group_limit_both_row_wins_multiple_batches() {
5662        let props = WriterProperties::builder()
5663            .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows
5664            .set_max_row_group_bytes(Some(9999)) // Won't trigger
5665            .build();
5666
5667        let builder = write_batches(
5668            WriteBatchesShape {
5669                num_batches: 10,
5670                rows_per_batch: 10,
5671                row_size: 100,
5672            },
5673            props,
5674        );
5675
5676        assert_eq!(
5677            &row_group_sizes(builder.metadata()),
5678            &[5; 20],
5679            "Row limit should trigger before byte limit"
5680        );
5681    }
5682
5683    #[test]
5684    // When both limits are set, the byte limit triggers first
5685    fn test_row_group_limit_both_bytes_wins() {
5686        let props = WriterProperties::builder()
5687            .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows
5688            .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows
5689            .build();
5690
5691        let builder = write_batches(
5692            WriteBatchesShape {
5693                num_batches: 10,
5694                rows_per_batch: 10,
5695                row_size: 100,
5696            },
5697            props,
5698        );
5699
5700        let sizes = row_group_sizes(builder.metadata());
5701
5702        assert!(
5703            sizes.len() > 1,
5704            "Byte limit should trigger before row limit, got {sizes:?}",
5705        );
5706
5707        assert!(
5708            sizes.iter().all(|&s| s < 1000),
5709            "No row group should hit the row limit"
5710        );
5711
5712        let total_rows: i64 = sizes.iter().sum();
5713        assert_eq!(total_rows, 100, "Total rows should be preserved");
5714    }
5715
5716    #[test]
5717    fn arrow_column_chunk_close_mut_drops_column_index() {
5718        use crate::arrow::ArrowSchemaConverter;
5719        use crate::file::writer::SerializedFileWriter;
5720
5721        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5722        let props = Arc::new(
5723            WriterProperties::builder()
5724                .set_statistics_enabled(EnabledStatistics::Page)
5725                .build(),
5726        );
5727        let parquet_schema = ArrowSchemaConverter::new()
5728            .with_coerce_types(props.coerce_types())
5729            .convert(&schema)
5730            .unwrap();
5731
5732        let mut buf = Vec::with_capacity(1024);
5733        let mut writer =
5734            SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5735                .unwrap();
5736
5737        let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5738        let mut col_writers = factory.create_column_writers(0).unwrap();
5739        let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5740        for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5741            col_writers[0].write(&leaves).unwrap();
5742        }
5743        let mut chunk = col_writers.pop().unwrap().close().unwrap();
5744
5745        // Immutable accessor exposes the close result produced at close time.
5746        assert!(
5747            chunk.close().column_index.is_some(),
5748            "EnabledStatistics::Page should produce a column_index"
5749        );
5750
5751        // Mutable accessor lets callers drop the page-level index before append.
5752        chunk.close_mut().column_index = None;
5753        assert!(chunk.close().column_index.is_none());
5754
5755        let mut rg = writer.next_row_group().unwrap();
5756        chunk.append_to_row_group(&mut rg).unwrap();
5757        rg.close().unwrap();
5758        let file_meta = writer.close().unwrap();
5759
5760        // After dropping column_index, the resulting file records no column
5761        // index offset/length for this chunk.
5762        let cc = file_meta.row_group(0).column(0);
5763        assert!(cc.column_index_range().is_none());
5764    }
5765
5766    /// Writes a single-column RecordBatch to an in-memory Parquet buffer.
5767    fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5768        let schema = Arc::new(Schema::new(vec![Field::new(
5769            "col",
5770            array.data_type().clone(),
5771            true,
5772        )]));
5773        let buf = get_bytes_after_close(
5774            schema.clone(),
5775            &RecordBatch::try_new(schema, vec![array]).unwrap(),
5776        );
5777        Bytes::from(buf)
5778    }
5779
5780    /// Reads column 0 from a single-row-group Parquet buffer, projecting it with the given schema.
5781    /// Passing a flat schema when the buffer was written from a REE array lets callers decode
5782    /// the physical values without the run-end encoding wrapper.
5783    fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5784        let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5785        ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5786            .unwrap()
5787            .build()
5788            .unwrap()
5789            .next()
5790            .unwrap()
5791            .unwrap()
5792            .column(0)
5793            .clone()
5794    }
5795
5796    fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5797        let flat_schema = Arc::new(Schema::new(vec![Field::new(
5798            "col",
5799            flat.data_type().clone(),
5800            true,
5801        )]));
5802        let ree_bytes = write_column_to_bytes(ree);
5803        let flat_bytes = write_column_to_bytes(flat.clone());
5804        assert_eq!(
5805            ree_bytes, flat_bytes,
5806            "REE and flat bytes should be identical"
5807        );
5808
5809        let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5810        let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5811
5812        assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5813        assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5814    }
5815
5816    #[test]
5817    fn ree_string() {
5818        let ree: ArrayRef = Arc::new(
5819            [Some("a"), Some("a"), None, Some("b"), Some("b")]
5820                .into_iter()
5821                .collect::<Int32RunArray>(),
5822        );
5823        let flat: ArrayRef = Arc::new(StringArray::from(vec![
5824            Some("a"),
5825            Some("a"),
5826            None,
5827            Some("b"),
5828            Some("b"),
5829        ]));
5830        ree_write_read_roundtrip(ree, flat);
5831    }
5832
5833    #[test]
5834    fn ree_int32() {
5835        let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5836        for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5837            b.append_option(v);
5838        }
5839        let ree: ArrayRef = Arc::new(b.finish());
5840        let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5841            Some(1),
5842            Some(1),
5843            None,
5844            Some(2),
5845            Some(2),
5846        ]));
5847        ree_write_read_roundtrip(ree, flat);
5848    }
5849
5850    #[test]
5851    fn ree_bool() {
5852        // run_ends [3, 5, 7] → [T,T,T, null,null, F,F]
5853        let ree: ArrayRef = Arc::new(
5854            RunArray::try_new(
5855                &Int32Array::from(vec![3, 5, 7]),
5856                &BooleanArray::from(vec![Some(true), None, Some(false)]),
5857            )
5858            .unwrap(),
5859        );
5860        let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5861            Some(true),
5862            Some(true),
5863            Some(true),
5864            None,
5865            None,
5866            Some(false),
5867            Some(false),
5868        ]));
5869        ree_write_read_roundtrip(ree, flat);
5870    }
5871
5872    #[test]
5873    fn ree_fixed_size_binary() {
5874        let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5875            let mut b = FixedSizeBinaryBuilder::new(2);
5876            for v in vals {
5877                match v {
5878                    Some(x) => b.append_value(x).unwrap(),
5879                    None => b.append_null(),
5880                }
5881            }
5882            b.finish()
5883        };
5884        // run_ends [2, 4, 6] → [aa,aa, null,null, bb,bb]
5885        let ree: ArrayRef = Arc::new(
5886            RunArray::try_new(
5887                &Int32Array::from(vec![2, 4, 6]),
5888                &mk(&[Some(b"aa"), None, Some(b"bb")]),
5889            )
5890            .unwrap(),
5891        );
5892        let flat: ArrayRef = Arc::new(mk(&[
5893            Some(b"aa"),
5894            Some(b"aa"),
5895            None,
5896            None,
5897            Some(b"bb"),
5898            Some(b"bb"),
5899        ]));
5900        ree_write_read_roundtrip(ree, flat);
5901    }
5902
5903    #[test]
5904    fn ree_single_run() {
5905        let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5906        let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5907        ree_write_read_roundtrip(ree, flat);
5908    }
5909
5910    #[test]
5911    fn ree_float32() {
5912        // run_ends [2, 4, 5] → [1.0, 1.0, null, null, 2.5]
5913        let ree: ArrayRef = Arc::new(
5914            RunArray::try_new(
5915                &Int32Array::from(vec![2, 4, 5]),
5916                &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5917            )
5918            .unwrap(),
5919        );
5920        let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5921            Some(1.0_f32),
5922            Some(1.0_f32),
5923            None,
5924            None,
5925            Some(2.5_f32),
5926        ]));
5927        ree_write_read_roundtrip(ree, flat);
5928    }
5929
5930    #[test]
5931    fn ree_sliced() {
5932        // A sliced (non-zero offset) REE array: verify that get_physical_index
5933        // correctly accounts for the logical offset when expanding.
5934        // Full array: run_ends [3, 5, 7] → [a,a,a, b,b, c,c]
5935        // After slice(2, 5) the logical view is [a, b, b, c, c].
5936        let full: ArrayRef = Arc::new(
5937            RunArray::try_new(
5938                &Int32Array::from(vec![3, 5, 7]),
5939                &StringArray::from(vec!["a", "b", "c"]),
5940            )
5941            .unwrap(),
5942        );
5943        let sliced = full.slice(2, 5);
5944        let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5945        ree_write_read_roundtrip(sliced, flat);
5946    }
5947
5948    #[test]
5949    fn ree_struct_with_ree_child() {
5950        // Struct with a REE string field and a REE int field — confirms
5951        // recursion visits every child and each collapses to the right leaf type.
5952        let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5953
5954        let col_a: ArrayRef = Arc::new(
5955            RunArray::try_new(
5956                &run_ends,
5957                &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5958            )
5959            .unwrap(),
5960        );
5961        let col_b: ArrayRef = Arc::new(
5962            RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5963        );
5964
5965        let struct_array: ArrayRef = Arc::new(StructArray::new(
5966            Fields::from(vec![
5967                Field::new("a", col_a.data_type().clone(), true),
5968                Field::new("b", col_b.data_type().clone(), true),
5969            ]),
5970            vec![col_a, col_b],
5971            None,
5972        ));
5973
5974        let schema = Arc::new(Schema::new(vec![Field::new(
5975            "row",
5976            struct_array.data_type().clone(),
5977            true,
5978        )]));
5979        let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5980
5981        let mut buf = Vec::new();
5982        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
5983        writer.write(&batch).unwrap();
5984        let metadata = writer.close().unwrap();
5985
5986        let parquet_schema = metadata.file_metadata().schema_descr();
5987        assert_eq!(parquet_schema.num_columns(), 2);
5988        assert_eq!(
5989            parquet_schema.column(0).physical_type(),
5990            crate::basic::Type::BYTE_ARRAY
5991        );
5992        assert_eq!(parquet_schema.column(0).path().string(), "row.a");
5993        assert_eq!(
5994            parquet_schema.column(1).physical_type(),
5995            crate::basic::Type::INT32
5996        );
5997        assert_eq!(parquet_schema.column(1).path().string(), "row.b");
5998    }
5999}