Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62    InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63    PageStoreFactory,
64};
65
66/// Encodes [`RecordBatch`] to parquet
67///
68/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
69/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
70/// flushed on close, leading the final row group in the output file to potentially
71/// contain fewer than `max_row_group_size` rows
72///
73/// # Example: Writing `RecordBatch`es
74/// ```
75/// # use std::sync::Arc;
76/// # use bytes::Bytes;
77/// # use arrow_array::{ArrayRef, Int64Array};
78/// # use arrow_array::RecordBatch;
79/// # use parquet::arrow::arrow_writer::ArrowWriter;
80/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
81/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
82/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
83///
84/// let mut buffer = Vec::new();
85/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
86/// writer.write(&to_write).unwrap();
87/// writer.close().unwrap();
88///
89/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
90/// let read = reader.next().unwrap().unwrap();
91///
92/// assert_eq!(to_write, read);
93/// ```
94///
95/// # Memory Usage and Limiting
96///
97/// The nature of Parquet requires buffering of an entire row group before it can
98/// be flushed to the underlying writer. Data is mostly buffered in its encoded
99/// form, reducing memory usage. However, some data such as dictionary keys,
100/// large strings or very nested data may still result in non-trivial memory
101/// usage.
102///
103/// See Also:
104/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
105/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
106///
107/// Call [`Self::flush`] to trigger an early flush of a row group based on a
108/// memory threshold and/or global memory pressure. However,  smaller row groups
109/// result in higher metadata overheads, and thus may worsen compression ratios
110/// and query performance.
111///
112/// ```no_run
113/// # use std::io::Write;
114/// # use arrow_array::RecordBatch;
115/// # use parquet::arrow::ArrowWriter;
116/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
117/// # let batch: RecordBatch = todo!();
118/// writer.write(&batch).unwrap();
119/// // Trigger an early flush if anticipated size exceeds 1_000_000
120/// if writer.in_progress_size() > 1_000_000 {
121///     writer.flush().unwrap();
122/// }
123/// ```
124///
125/// ## Type Support
126///
127/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
128/// Parquet types including  [`StructArray`] and [`ListArray`].
129///
130/// The following are not supported:
131///
132/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
133///
134/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
135/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
136/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
137/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
138/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
139///
140/// ## Type Compatibility
141/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
142/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
143/// value type.
144///
145/// For example, the following [`DataType`]s are all logically equivalent and can be written
146/// to the same column:
147/// * String, LargeString, StringView
148/// * Binary, LargeBinary, BinaryView
149///
150/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
151/// contain compatible values.
152/// ```
153/// # use std::sync::Arc;
154/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
155/// # use arrow_schema::{DataType, Field, Schema};
156/// # use parquet::arrow::arrow_writer::ArrowWriter;
157/// let record_batch1 = RecordBatch::try_new(
158///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
159///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
160///  )
161/// .unwrap();
162///
163/// let mut buffer = Vec::new();
164/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
165/// writer.write(&record_batch1).unwrap();
166///
167/// let record_batch2 = RecordBatch::try_new(
168///     Arc::new(Schema::new(vec![Field::new(
169///         "col",
170///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
171///          false,
172///     )])),
173///     vec![Arc::new(DictionaryArray::new(
174///          UInt8Array::from_iter_values(vec![0, 1]),
175///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
176///      ))],
177///  )
178///  .unwrap();
179///  writer.write(&record_batch2).unwrap();
180///  writer.close();
181/// ```
182pub struct ArrowWriter<W: Write> {
183    /// Underlying Parquet writer
184    writer: SerializedFileWriter<W>,
185
186    /// The in-progress row group if any
187    in_progress: Option<ArrowRowGroupWriter>,
188
189    /// A copy of the Arrow schema.
190    ///
191    /// The schema is used to verify that each record batch written has the correct schema
192    arrow_schema: SchemaRef,
193
194    /// Creates new [`ArrowRowGroupWriter`] instances as required
195    row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197    /// The maximum number of rows to write to each row group, or None for unlimited
198    max_row_group_row_count: Option<usize>,
199
200    /// The maximum size in bytes for a row group, or None for unlimited
201    max_row_group_bytes: Option<usize>,
202
203    /// CDC chunkers persisted across row groups (one per leaf column).
204    cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        let buffered_memory = self.in_progress_size();
210        f.debug_struct("ArrowWriter")
211            .field("writer", &self.writer)
212            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213            .field("in_progress_rows", &self.in_progress_rows())
214            .field("arrow_schema", &self.arrow_schema)
215            .field("max_row_group_row_count", &self.max_row_group_row_count)
216            .field("max_row_group_bytes", &self.max_row_group_bytes)
217            .finish()
218    }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222    /// Try to create a new Arrow writer
223    ///
224    /// The writer will fail if:
225    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
226    ///  * the Arrow schema contains unsupported datatypes such as Unions
227    pub fn try_new(
228        writer: W,
229        arrow_schema: SchemaRef,
230        props: Option<WriterProperties>,
231    ) -> Result<Self> {
232        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233        Self::try_new_with_options(writer, arrow_schema, options)
234    }
235
236    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
237    ///
238    /// The writer will fail if:
239    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
240    ///  * the Arrow schema contains unsupported datatypes such as Unions
241    pub fn try_new_with_options(
242        writer: W,
243        arrow_schema: SchemaRef,
244        options: ArrowWriterOptions,
245    ) -> Result<Self> {
246        let mut props = options.properties;
247
248        let schema = if let Some(parquet_schema) = options.schema_descr {
249            parquet_schema.clone()
250        } else {
251            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252            if let Some(schema_root) = &options.schema_root {
253                converter = converter.schema_root(schema_root);
254            }
255
256            converter.convert(&arrow_schema)?
257        };
258
259        if !options.skip_arrow_metadata {
260            // add serialized arrow schema
261            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262        }
263
264        let max_row_group_row_count = props.max_row_group_row_count();
265        let max_row_group_bytes = props.max_row_group_bytes();
266
267        let props_ptr = Arc::new(props);
268        let file_writer =
269            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271        let mut row_group_writer_factory =
272            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273        if let Some(page_store_factory) = options.page_store_factory {
274            row_group_writer_factory =
275                row_group_writer_factory.with_page_store_factory(page_store_factory);
276        }
277
278        let cdc_chunkers = props_ptr
279            .content_defined_chunking()
280            .map(|opts| {
281                file_writer
282                    .schema_descr()
283                    .columns()
284                    .iter()
285                    .map(|desc| ContentDefinedChunker::new(desc, opts))
286                    .collect::<Result<Vec<_>>>()
287            })
288            .transpose()?;
289
290        Ok(Self {
291            writer: file_writer,
292            in_progress: None,
293            arrow_schema,
294            row_group_writer_factory,
295            max_row_group_row_count,
296            max_row_group_bytes,
297            cdc_chunkers,
298        })
299    }
300
301    /// Returns metadata for any flushed row groups
302    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303        self.writer.flushed_row_groups()
304    }
305
306    /// Estimated memory usage, in bytes, of this `ArrowWriter`
307    ///
308    /// This estimate is formed bu summing the values of
309    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
310    pub fn memory_size(&self) -> usize {
311        match &self.in_progress {
312            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313            None => 0,
314        }
315    }
316
317    /// Anticipated encoded size of the in progress row group.
318    ///
319    /// This estimate the row group size after being completely encoded is,
320    /// formed by summing the values of
321    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
322    /// columns.
323    pub fn in_progress_size(&self) -> usize {
324        match &self.in_progress {
325            Some(in_progress) => in_progress
326                .writers
327                .iter()
328                .map(|x| x.get_estimated_total_bytes())
329                .sum(),
330            None => 0,
331        }
332    }
333
334    /// Returns the number of rows buffered in the in progress row group
335    pub fn in_progress_rows(&self) -> usize {
336        self.in_progress
337            .as_ref()
338            .map(|x| x.buffered_rows)
339            .unwrap_or_default()
340    }
341
342    /// Returns the number of bytes written by this instance
343    pub fn bytes_written(&self) -> usize {
344        self.writer.bytes_written()
345    }
346
347    /// Encodes the provided [`RecordBatch`]
348    ///
349    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]
350    /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be
351    /// written to one or more row groups such that limits are respected.
352    ///
353    /// If both limits are `None`, all data is written to a single row group.
354    /// If one limit is set, that limit is respected.
355    /// If both limits are set, the lower bound (whichever triggers first) is respected.
356    ///
357    /// This will fail if the `batch`'s schema does not match the writer's schema.
358    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359        if batch.num_rows() == 0 {
360            return Ok(());
361        }
362
363        let in_progress = match &mut self.in_progress {
364            Some(in_progress) => in_progress,
365            x => x.insert(
366                self.row_group_writer_factory
367                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368            ),
369        };
370
371        if let Some(max_rows) = self.max_row_group_row_count {
372            if in_progress.buffered_rows + batch.num_rows() > max_rows {
373                let to_write = max_rows - in_progress.buffered_rows;
374                let a = batch.slice(0, to_write);
375                let b = batch.slice(to_write, batch.num_rows() - to_write);
376                self.write(&a)?;
377                return self.write(&b);
378            }
379        }
380
381        // Check byte limit: if we have buffered data, use measured average row size
382        // to split batch proactively before exceeding byte limit
383        if let Some(max_bytes) = self.max_row_group_bytes {
384            if in_progress.buffered_rows > 0 {
385                let current_bytes = in_progress.get_estimated_total_bytes();
386
387                if current_bytes >= max_bytes {
388                    self.flush()?;
389                    return self.write(batch);
390                }
391
392                let avg_row_bytes = current_bytes / in_progress.buffered_rows;
393                if avg_row_bytes > 0 {
394                    // At this point, `current_bytes < max_bytes` (checked above)
395                    let remaining_bytes = max_bytes - current_bytes;
396                    let rows_that_fit = remaining_bytes / avg_row_bytes;
397
398                    if batch.num_rows() > rows_that_fit {
399                        if rows_that_fit > 0 {
400                            let a = batch.slice(0, rows_that_fit);
401                            let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
402                            self.write(&a)?;
403                            return self.write(&b);
404                        } else {
405                            self.flush()?;
406                            return self.write(batch);
407                        }
408                    }
409                }
410            }
411        }
412
413        match self.cdc_chunkers.as_mut() {
414            Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
415            None => in_progress.write(batch)?,
416        }
417
418        let should_flush = self
419            .max_row_group_row_count
420            .is_some_and(|max| in_progress.buffered_rows >= max)
421            || self
422                .max_row_group_bytes
423                .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
424
425        if should_flush {
426            self.flush()?
427        }
428        Ok(())
429    }
430
431    /// Writes the given buf bytes to the internal buffer.
432    ///
433    /// It's safe to use this method to write data to the underlying writer,
434    /// because it will ensure that the buffering and byte‐counting layers are used.
435    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
436        self.writer.write_all(buf)
437    }
438
439    /// Flushes underlying writer
440    pub fn sync(&mut self) -> std::io::Result<()> {
441        self.writer.flush()
442    }
443
444    /// Flushes all buffered rows into a new row group
445    ///
446    /// Note the underlying writer is not flushed with this call.
447    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
448    pub fn flush(&mut self) -> Result<()> {
449        let in_progress = match self.in_progress.take() {
450            Some(in_progress) => in_progress,
451            None => return Ok(()),
452        };
453
454        let mut row_group_writer = self.writer.next_row_group()?;
455        for chunk in in_progress.close()? {
456            chunk.append_to_row_group(&mut row_group_writer)?;
457        }
458        row_group_writer.close()?;
459        Ok(())
460    }
461
462    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
463    ///
464    /// This method provide a way to append kv_metadata after write RecordBatch
465    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
466        self.writer.append_key_value_metadata(kv_metadata)
467    }
468
469    /// Returns a reference to the underlying writer.
470    pub fn inner(&self) -> &W {
471        self.writer.inner()
472    }
473
474    /// Returns a mutable reference to the underlying writer.
475    ///
476    /// **Warning**: if you write directly to this writer, you will skip
477    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
478    /// the file footer’s recorded offsets and sizes to diverge from reality,
479    /// resulting in an unreadable or corrupted Parquet file.
480    ///
481    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
482    pub fn inner_mut(&mut self) -> &mut W {
483        self.writer.inner_mut()
484    }
485
486    /// Flushes any outstanding data and returns the underlying writer.
487    pub fn into_inner(mut self) -> Result<W> {
488        self.flush()?;
489        self.writer.into_inner()
490    }
491
492    /// Close and finalize the underlying Parquet writer
493    ///
494    /// Unlike [`Self::close`] this does not consume self
495    ///
496    /// Attempting to write after calling finish will result in an error
497    pub fn finish(&mut self) -> Result<ParquetMetaData> {
498        self.flush()?;
499        self.writer.finish()
500    }
501
502    /// Close and finalize the underlying Parquet writer
503    pub fn close(mut self) -> Result<ParquetMetaData> {
504        self.finish()
505    }
506
507    /// Create a new row group writer and return its column writers.
508    #[deprecated(
509        since = "56.2.0",
510        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
511    )]
512    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
513        self.flush()?;
514        let in_progress = self
515            .row_group_writer_factory
516            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
517        Ok(in_progress.writers)
518    }
519
520    /// Append the given column chunks to the file as a new row group.
521    #[deprecated(
522        since = "56.2.0",
523        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
524    )]
525    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
526        let mut row_group_writer = self.writer.next_row_group()?;
527        for chunk in chunks {
528            chunk.append_to_row_group(&mut row_group_writer)?;
529        }
530        row_group_writer.close()?;
531        Ok(())
532    }
533
534    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
535    ///
536    /// Flushes any outstanding data before returning.
537    ///
538    /// This can be useful to provide more control over how files are written, for example
539    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
540    pub fn into_serialized_writer(
541        mut self,
542    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
543        self.flush()?;
544        Ok((self.writer, self.row_group_writer_factory))
545    }
546}
547
548impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
549    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
550        self.write(batch).map_err(|e| e.into())
551    }
552
553    fn close(self) -> std::result::Result<(), ArrowError> {
554        self.close()?;
555        Ok(())
556    }
557}
558
559/// Arrow-specific configuration settings for writing parquet files.
560///
561/// See [`ArrowWriter`] for how to configure the writer.
562#[derive(Debug, Clone, Default)]
563pub struct ArrowWriterOptions {
564    properties: WriterProperties,
565    skip_arrow_metadata: bool,
566    schema_root: Option<String>,
567    schema_descr: Option<SchemaDescriptor>,
568    page_store_factory: Option<Arc<dyn PageStoreFactory>>,
569}
570
571impl ArrowWriterOptions {
572    /// Creates a new [`ArrowWriterOptions`] with the default settings.
573    pub fn new() -> Self {
574        Self::default()
575    }
576
577    /// Sets the [`WriterProperties`] for writing parquet files.
578    pub fn with_properties(self, properties: WriterProperties) -> Self {
579        Self { properties, ..self }
580    }
581
582    /// Sets the [`PageStoreFactory`] used to buffer completed pages while a row
583    /// group is being written.
584    ///
585    /// By default (an [`InMemoryPageStore`] per column chunk) completed pages
586    /// are buffered on the heap until the row group is flushed, so peak memory
587    /// grows with the row group size. Supplying a factory that spills to a temp
588    /// file or object storage instead bounds peak write memory, decoupling it
589    /// from the row group size while keeping large, read-optimal row groups.
590    ///
591    /// # Example: a custom [`PageStore`]
592    ///
593    /// A store only has to map an opaque, store-allocated [`PageKey`] to a blob
594    /// and hand the blob back once. The keys need not be dense or sequential —
595    /// here a `HashMap`-backed store mints sparse handles, proving the writer
596    /// relies only on the opaque-handle contract. A real spilling backend would
597    /// write the bytes to a temp file in `put` and read them back in `take`.
598    ///
599    /// ```
600    /// # use std::collections::HashMap;
601    /// # use std::sync::Arc;
602    /// # use bytes::Bytes;
603    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
604    /// # use parquet::arrow::arrow_writer::{
605    /// #     ArrowWriter, ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory,
606    /// # };
607    /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
608    /// # use parquet::errors::{ParquetError, Result};
609    /// #[derive(Default)]
610    /// struct MapPageStore {
611    ///     blobs: HashMap<u64, Bytes>,
612    ///     next: u64,
613    /// }
614    ///
615    /// impl PageStore for MapPageStore {
616    ///     fn put(&mut self, value: Bytes) -> Result<PageKey> {
617    ///         // Mint a sparse handle (every other integer) to show the writer
618    ///         // never assumes anything about the key's value.
619    ///         let key = PageKey::new(self.next);
620    ///         self.next += 2;
621    ///         self.blobs.insert(key.get(), value);
622    ///         Ok(key)
623    ///     }
624    ///
625    ///     fn take(&mut self, key: PageKey) -> Result<Bytes> {
626    ///         self.blobs
627    ///             .remove(&key.get())
628    ///             .ok_or_else(|| ParquetError::General(format!("invalid key {}", key.get())))
629    ///     }
630    /// }
631    ///
632    /// #[derive(Debug)]
633    /// struct MapPageStoreFactory;
634    ///
635    /// impl PageStoreFactory for MapPageStoreFactory {
636    ///     fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
637    ///         // `args` exposes the column index and descriptor (physical/logical
638    ///         // type, path), so a real backend could spill only large columns.
639    ///         let _ = (args.column_index(), args.column_descriptor());
640    ///         Ok(Box::new(MapPageStore::default()))
641    ///     }
642    /// }
643    ///
644    /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef;
645    /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
646    ///
647    /// let options =
648    ///     ArrowWriterOptions::new().with_page_store_factory(Arc::new(MapPageStoreFactory));
649    /// let mut buffer = Vec::new();
650    /// let mut writer =
651    ///     ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), options).unwrap();
652    /// writer.write(&to_write).unwrap();
653    /// writer.close().unwrap();
654    ///
655    /// // The file is byte-identical to one written with the default store.
656    /// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
657    /// assert_eq!(to_write, reader.next().unwrap().unwrap());
658    /// ```
659    pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
660        Self {
661            page_store_factory: Some(page_store_factory),
662            ..self
663        }
664    }
665
666    /// Skip encoding the embedded arrow metadata (defaults to `false`)
667    ///
668    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
669    /// by default.
670    ///
671    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
672    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
673        Self {
674            skip_arrow_metadata,
675            ..self
676        }
677    }
678
679    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
680    pub fn with_schema_root(self, schema_root: String) -> Self {
681        Self {
682            schema_root: Some(schema_root),
683            ..self
684        }
685    }
686
687    /// Explicitly specify the Parquet schema to be used
688    ///
689    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
690    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
691    /// already known or must be calculated using custom logic.
692    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
693        Self {
694            schema_descr: Some(schema_descr),
695            ..self
696        }
697    }
698}
699
700/// A single column chunk produced by [`ArrowColumnWriter`].
701///
702/// Holds the serialized page blobs (each page's header ‖ compressed data, in
703/// write order) in a [`PageStore`], plus the handles needed to read them back,
704/// in order, when the chunk is spliced into the output file.
705struct ArrowColumnChunkData {
706    length: usize,
707    store: Box<dyn PageStore>,
708    keys: Vec<PageKey>,
709    /// The dictionary page's serialized blobs (header ‖ data), held in memory
710    /// rather than the store.
711    ///
712    /// A dictionary page is produced at most once and bounded by
713    /// `dict_page_size_limit`, but it must be written *first* in the chunk even
714    /// though the data pages reach the writer before it (see
715    /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only
716    /// round-trip ~1 page to the backend and straight back, so it is kept here
717    /// and emitted ahead of the data pages at splice. Empty for non-dictionary
718    /// columns.
719    dictionary: Vec<Bytes>,
720}
721
722impl ArrowColumnChunkData {
723    fn new(store: Box<dyn PageStore>) -> Self {
724        Self {
725            length: 0,
726            store,
727            keys: Vec::new(),
728            dictionary: Vec::new(),
729        }
730    }
731
732    /// Append a data-page blob to the store, recording its handle in write
733    /// order.
734    fn push(&mut self, value: Bytes) -> Result<()> {
735        let key = self.store.put(value)?;
736        self.keys.push(key);
737        Ok(())
738    }
739
740    /// Retain a dictionary-page blob in memory (emitted first at splice).
741    fn push_dictionary(&mut self, value: Bytes) {
742        self.dictionary.push(value);
743    }
744
745    /// Total serialized size of the in-memory dictionary page, in bytes.
746    fn dictionary_len(&self) -> usize {
747        self.dictionary.iter().map(Bytes::len).sum()
748    }
749
750    /// Bytes this chunk currently holds on the heap: whatever the store keeps
751    /// resident (zero for a spilling backend) plus the in-memory dictionary
752    /// page.
753    fn memory_size(&self) -> usize {
754        self.store.memory_size() + self.dictionary_len()
755    }
756}
757
758/// A streaming [`Read`] over one column chunk's buffered pages, in final file
759/// order: the in-memory dictionary page (if any) first, then the data pages.
760///
761/// Each data-page blob is taken back out of the [`PageStore`] *as it is
762/// consumed* and released immediately afterwards, so splicing a chunk into the
763/// output file never materializes more than a single page in memory at a time.
764/// This is what keeps the splice phase within the memory bound for a spilling
765/// backend (an in-memory store already holds the bytes, so it is unaffected).
766struct StreamingColumnChunkReader {
767    /// Dictionary-page blobs, emitted before any data page.
768    dictionary: IntoIter<Bytes>,
769    store: Box<dyn PageStore>,
770    keys: IntoIter<PageKey>,
771    /// The blob currently being drained into the output; emptied as it is read.
772    current: Bytes,
773}
774
775impl StreamingColumnChunkReader {
776    fn new(data: ArrowColumnChunkData) -> Self {
777        Self {
778            dictionary: data.dictionary.into_iter(),
779            store: data.store,
780            keys: data.keys.into_iter(),
781            current: Bytes::new(),
782        }
783    }
784}
785
786impl Read for StreamingColumnChunkReader {
787    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
788        // Refill from the next blob whenever the current one is drained: the
789        // dictionary page first, then each data page from the store.
790        while self.current.is_empty() {
791            if let Some(blob) = self.dictionary.next() {
792                self.current = blob;
793            } else if let Some(key) = self.keys.next() {
794                self.current = self.store.take(key).map_err(std::io::Error::other)?;
795            } else {
796                return Ok(0);
797            }
798        }
799
800        let len = self.current.len().min(out.len());
801        let b = self.current.split_to(len);
802        out[..len].copy_from_slice(&b);
803        Ok(len)
804    }
805}
806
807/// A shared [`ArrowColumnChunkData`]
808///
809/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
810/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
811type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
812
813struct ArrowPageWriter {
814    buffer: SharedColumnChunk,
815    #[cfg(feature = "encryption")]
816    page_encryptor: Option<PageEncryptor>,
817}
818
819impl ArrowPageWriter {
820    /// Create a page writer that buffers completed pages in `store`.
821    fn new(store: Box<dyn PageStore>) -> Self {
822        Self {
823            buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
824            #[cfg(feature = "encryption")]
825            page_encryptor: None,
826        }
827    }
828
829    #[cfg(feature = "encryption")]
830    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
831        self.page_encryptor = page_encryptor;
832        self
833    }
834
835    #[cfg(feature = "encryption")]
836    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
837        self.page_encryptor.as_mut()
838    }
839
840    #[cfg(not(feature = "encryption"))]
841    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
842        None
843    }
844}
845
846impl PageWriter for ArrowPageWriter {
847    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
848        let page = match self.page_encryptor_mut() {
849            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
850            None => page,
851        };
852
853        let page_header = page.to_thrift_header()?;
854        let header = {
855            let mut header = Vec::with_capacity(1024);
856
857            match self.page_encryptor_mut() {
858                Some(page_encryptor) => {
859                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
860                    if page.compressed_page().is_data_page() {
861                        page_encryptor.increment_page();
862                    }
863                }
864                None => {
865                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
866                    page_header.write_thrift(&mut protocol)?;
867                }
868            };
869
870            Bytes::from(header)
871        };
872
873        let mut buf = self.buffer.try_lock().unwrap();
874
875        let data = page.compressed_page().buffer().clone();
876        let compressed_size = data.len() + header.len();
877
878        let mut spec = PageWriteSpec::new();
879        spec.page_type = page.page_type();
880        spec.num_values = page.num_values();
881        spec.uncompressed_size = page.uncompressed_size() + header.len();
882        spec.offset = buf.length as u64;
883        spec.compressed_size = compressed_size;
884        spec.bytes_written = compressed_size as u64;
885
886        buf.length += compressed_size;
887        if spec.page_type == PageType::DICTIONARY_PAGE {
888            // Held in memory and emitted first at splice — see
889            // `ArrowColumnChunkData::dictionary`.
890            buf.push_dictionary(header);
891            buf.push_dictionary(data);
892        } else {
893            buf.push(header)?;
894            buf.push(data)?;
895        }
896
897        Ok(spec)
898    }
899
900    fn defers_dictionary_ordering(&self) -> bool {
901        // The Arrow chunk is buffered in full and spliced at row-group flush, so
902        // data pages may be accepted before the dictionary page and reordered
903        // then. This lets `GenericColumnWriter` stream dictionary-column data
904        // pages straight through instead of buffering them in memory.
905        true
906    }
907
908    fn buffered_memory_size(&self) -> usize {
909        // Only what is actually resident: a spilling store reports ~0 here even
910        // though the chunk's bytes have all passed through it.
911        self.buffer.try_lock().unwrap().memory_size()
912    }
913
914    fn close(&mut self) -> Result<()> {
915        Ok(())
916    }
917}
918
919/// A leaf column that can be encoded by [`ArrowColumnWriter`]
920#[derive(Debug)]
921pub struct ArrowLeafColumn(ArrayLevels);
922
923/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
924///
925/// This function can be used along with [`get_column_writers`] to encode
926/// individual columns in parallel. See example on [`ArrowColumnWriter`]
927pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
928    let levels = calculate_array_levels(array, field)?;
929    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
930}
931
932/// The data for a single column chunk, see [`ArrowColumnWriter`]
933pub struct ArrowColumnChunk {
934    data: ArrowColumnChunkData,
935    close: ColumnCloseResult,
936}
937
938impl std::fmt::Debug for ArrowColumnChunk {
939    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
940        f.debug_struct("ArrowColumnChunk")
941            .field("length", &self.data.length)
942            .finish_non_exhaustive()
943    }
944}
945
946impl ArrowColumnChunk {
947    /// Returns the [`ColumnCloseResult`] produced when the chunk was closed.
948    ///
949    /// Exposes encoding information, collected statistics, and the optional
950    /// [`ColumnIndexMetaData`](crate::file::page_index::column_index::ColumnIndexMetaData)
951    /// / [`OffsetIndexMetaData`](crate::file::page_index::offset_index::OffsetIndexMetaData)
952    /// gathered for the column chunk.
953    pub fn close(&self) -> &ColumnCloseResult {
954        &self.close
955    }
956
957    /// Returns a mutable reference to the [`ColumnCloseResult`].
958    ///
959    /// This allows callers to mutate the close result before the chunk is
960    /// appended to a row group — for example, clearing `column_index` or
961    /// `bloom_filter` based on a dynamic rule that inspects the encodings and
962    /// collected page statistics.
963    pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
964        &mut self.close
965    }
966
967    /// Splices this column's buffered pages into the row group, streaming them
968    /// back out of the [`PageStore`] one page at a time.
969    pub fn append_to_row_group<W: Write + Send>(
970        self,
971        writer: &mut SerializedRowGroupWriter<'_, W>,
972    ) -> Result<()> {
973        let ArrowColumnChunk { data, close } = self;
974
975        // The dictionary page is produced *after* the data pages on this path (so
976        // they can stream straight through) but must be written *first*, so move
977        // it ahead of the data pages in the recorded offsets before the splice.
978        let close = close.update_dictionary_location(data.dictionary_len())?;
979
980        let reader = StreamingColumnChunkReader::new(data);
981        writer.append_column_from_read(reader, close)
982    }
983}
984
985/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
986///
987/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
988///
989/// Note: This is a low-level interface for applications that require
990/// fine-grained control of encoding (e.g. encoding using multiple threads),
991/// see [`ArrowWriter`] for a higher-level interface
992///
993/// # Example: Encoding two Arrow Array's in Parallel
994/// ```
995/// // The arrow schema
996/// # use std::sync::Arc;
997/// # use arrow_array::*;
998/// # use arrow_schema::*;
999/// # use parquet::arrow::ArrowSchemaConverter;
1000/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
1001/// # use parquet::file::properties::WriterProperties;
1002/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1003/// #
1004/// let schema = Arc::new(Schema::new(vec![
1005///     Field::new("i32", DataType::Int32, false),
1006///     Field::new("f32", DataType::Float32, false),
1007/// ]));
1008///
1009/// // Compute the parquet schema
1010/// let props = Arc::new(WriterProperties::default());
1011/// let parquet_schema = ArrowSchemaConverter::new()
1012///   .with_coerce_types(props.coerce_types())
1013///   .convert(&schema)
1014///   .unwrap();
1015///
1016/// // Create parquet writer
1017/// let root_schema = parquet_schema.root_schema_ptr();
1018/// // write to memory in the example, but this could be a File
1019/// let mut out = Vec::with_capacity(1024);
1020/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
1021///   .unwrap();
1022///
1023/// // Create a factory for building Arrow column writers
1024/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
1025/// // Create column writers for the 0th row group
1026/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
1027///
1028/// // Spawn a worker thread for each column
1029/// //
1030/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
1031/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
1032/// let mut workers: Vec<_> = col_writers
1033///     .into_iter()
1034///     .map(|mut col_writer| {
1035///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1036///         let handle = std::thread::spawn(move || {
1037///             // receive Arrays to encode via the channel
1038///             for col in recv {
1039///                 col_writer.write(&col)?;
1040///             }
1041///             // once the input is complete, close the writer
1042///             // to return the newly created ArrowColumnChunk
1043///             col_writer.close()
1044///         });
1045///         (handle, send)
1046///     })
1047///     .collect();
1048///
1049/// // Start row group
1050/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
1051///   .next_row_group()
1052///   .unwrap();
1053///
1054/// // Create some example input columns to encode
1055/// let to_write = vec![
1056///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
1057///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
1058/// ];
1059///
1060/// // Send the input columns to the workers
1061/// let mut worker_iter = workers.iter_mut();
1062/// for (arr, field) in to_write.iter().zip(&schema.fields) {
1063///     for leaves in compute_leaves(field, arr).unwrap() {
1064///         worker_iter.next().unwrap().1.send(leaves).unwrap();
1065///     }
1066/// }
1067///
1068/// // Wait for the workers to complete encoding, and append
1069/// // the resulting column chunks to the row group (and the file)
1070/// for (handle, send) in workers {
1071///     drop(send); // Drop send side to signal termination
1072///     // wait for the worker to send the completed chunk
1073///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1074///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
1075/// }
1076/// // Close the row group which writes to the underlying file
1077/// row_group_writer.close().unwrap();
1078///
1079/// let metadata = writer.close().unwrap();
1080/// assert_eq!(metadata.file_metadata().num_rows(), 3);
1081/// ```
1082pub struct ArrowColumnWriter {
1083    writer: ArrowColumnWriterImpl,
1084    chunk: SharedColumnChunk,
1085}
1086
1087impl std::fmt::Debug for ArrowColumnWriter {
1088    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1089        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1090    }
1091}
1092
1093enum ArrowColumnWriterImpl {
1094    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1095    Column(ColumnWriter<'static>),
1096}
1097
1098impl ArrowColumnWriter {
1099    /// Write an [`ArrowLeafColumn`]
1100    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1101        self.write_internal(&col.0)
1102    }
1103
1104    /// Write with content-defined chunking, inserting page flushes at chunk boundaries.
1105    fn write_with_chunker(
1106        &mut self,
1107        col: &ArrowLeafColumn,
1108        chunker: &mut ContentDefinedChunker,
1109    ) -> Result<()> {
1110        let levels = &col.0;
1111        let chunks = chunker.get_arrow_chunks(
1112            levels.def_level_data().as_ref(),
1113            levels.rep_level_data().as_ref(),
1114            levels.array(),
1115        )?;
1116
1117        let num_chunks = chunks.len();
1118        for (i, chunk) in chunks.iter().enumerate() {
1119            let chunk_levels = levels.slice_for_chunk(chunk);
1120            self.write_internal(&chunk_levels)?;
1121
1122            // Add a page break after each chunk except the last
1123            if i + 1 < num_chunks {
1124                match &mut self.writer {
1125                    ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1126                    ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1127                }
1128            }
1129        }
1130        Ok(())
1131    }
1132
1133    fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1134        match &mut self.writer {
1135            ArrowColumnWriterImpl::Column(c) => {
1136                let leaf = levels.array();
1137                match leaf.as_any_dictionary_opt() {
1138                    Some(dictionary) => {
1139                        let materialized =
1140                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1141                        write_leaf(c, &materialized, levels)?
1142                    }
1143                    None => write_leaf(c, leaf, levels)?,
1144                };
1145            }
1146            ArrowColumnWriterImpl::ByteArray(c) => {
1147                write_primitive(c, levels.array().as_ref(), levels)?;
1148            }
1149        }
1150        Ok(())
1151    }
1152
1153    /// Close this column returning the written [`ArrowColumnChunk`]
1154    pub fn close(self) -> Result<ArrowColumnChunk> {
1155        let close = match self.writer {
1156            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1157            ArrowColumnWriterImpl::Column(c) => c.close()?,
1158        };
1159        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1160        let data = chunk.into_inner().unwrap();
1161        Ok(ArrowColumnChunk { data, close })
1162    }
1163
1164    /// Returns the estimated total memory usage by the writer.
1165    ///
1166    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
1167    /// of the current memory usage and not it's anticipated encoded size.
1168    ///
1169    /// This includes:
1170    /// 1. Data buffered in encoded form
1171    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
1172    ///
1173    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
1174    pub fn memory_size(&self) -> usize {
1175        match &self.writer {
1176            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1177            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1178        }
1179    }
1180
1181    /// Returns the estimated total encoded bytes for this column writer.
1182    ///
1183    /// This includes:
1184    /// 1. Data buffered in encoded form
1185    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
1186    ///
1187    /// This value should be less than or equal to [`Self::memory_size`]
1188    pub fn get_estimated_total_bytes(&self) -> usize {
1189        match &self.writer {
1190            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1191            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1192        }
1193    }
1194}
1195
1196/// Encodes [`RecordBatch`] to a parquet row group
1197///
1198/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
1199/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
1200///
1201/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1202#[derive(Debug)]
1203struct ArrowRowGroupWriter {
1204    writers: Vec<ArrowColumnWriter>,
1205    schema: SchemaRef,
1206    buffered_rows: usize,
1207}
1208
1209impl ArrowRowGroupWriter {
1210    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1211        Self {
1212            writers,
1213            schema: arrow.clone(),
1214            buffered_rows: 0,
1215        }
1216    }
1217
1218    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1219        self.buffered_rows += batch.num_rows();
1220        let mut writers = self.writers.iter_mut();
1221        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1222            for leaf in compute_leaves(field.as_ref(), column)? {
1223                writers.next().unwrap().write(&leaf)?;
1224            }
1225        }
1226        Ok(())
1227    }
1228
1229    fn write_with_chunkers(
1230        &mut self,
1231        batch: &RecordBatch,
1232        chunkers: &mut [ContentDefinedChunker],
1233    ) -> Result<()> {
1234        self.buffered_rows += batch.num_rows();
1235        let mut writers = self.writers.iter_mut();
1236        let mut chunkers = chunkers.iter_mut();
1237        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1238            for leaf in compute_leaves(field.as_ref(), column)? {
1239                writers
1240                    .next()
1241                    .unwrap()
1242                    .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1243            }
1244        }
1245        Ok(())
1246    }
1247
1248    /// Returns the estimated total encoded bytes for this row group
1249    fn get_estimated_total_bytes(&self) -> usize {
1250        self.writers
1251            .iter()
1252            .map(|x| x.get_estimated_total_bytes())
1253            .sum()
1254    }
1255
1256    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1257        self.writers
1258            .into_iter()
1259            .map(|writer| writer.close())
1260            .collect()
1261    }
1262}
1263
1264/// Factory that creates new column writers for each row group in the Parquet file.
1265///
1266/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
1267/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1268#[derive(Debug)]
1269pub struct ArrowRowGroupWriterFactory {
1270    schema: SchemaDescPtr,
1271    arrow_schema: SchemaRef,
1272    props: WriterPropertiesPtr,
1273    page_store_factory: Arc<dyn PageStoreFactory>,
1274    #[cfg(feature = "encryption")]
1275    file_encryptor: Option<Arc<FileEncryptor>>,
1276}
1277
1278impl ArrowRowGroupWriterFactory {
1279    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
1280    pub fn new<W: Write + Send>(
1281        file_writer: &SerializedFileWriter<W>,
1282        arrow_schema: SchemaRef,
1283    ) -> Self {
1284        let schema = Arc::clone(file_writer.schema_descr_ptr());
1285        let props = Arc::clone(file_writer.properties());
1286        Self {
1287            schema,
1288            arrow_schema,
1289            props,
1290            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1291            #[cfg(feature = "encryption")]
1292            file_encryptor: file_writer.file_encryptor(),
1293        }
1294    }
1295
1296    /// Set the [`PageStoreFactory`] used to allocate the buffer for each column
1297    /// chunk, e.g. to spill completed pages to a temp file or object storage
1298    /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`].
1299    pub fn with_page_store_factory(
1300        mut self,
1301        page_store_factory: Arc<dyn PageStoreFactory>,
1302    ) -> Self {
1303        self.page_store_factory = page_store_factory;
1304        self
1305    }
1306
1307    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1308        let writers = self.create_column_writers(row_group_index)?;
1309        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1310    }
1311
1312    /// Create column writers for a new row group, with the given row group index
1313    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1314        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1315        let mut leaves = self.schema.columns().iter();
1316        let column_factory = self.column_writer_factory(row_group_index);
1317        for field in &self.arrow_schema.fields {
1318            column_factory.get_arrow_column_writer(
1319                field.data_type(),
1320                &self.props,
1321                &mut leaves,
1322                &mut writers,
1323            )?;
1324        }
1325        Ok(writers)
1326    }
1327
1328    #[cfg(feature = "encryption")]
1329    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1330        ArrowColumnWriterFactory::new()
1331            .with_page_store_factory(self.page_store_factory.clone())
1332            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1333    }
1334
1335    #[cfg(not(feature = "encryption"))]
1336    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1337        ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1338    }
1339}
1340
1341/// Returns [`ArrowColumnWriter`]s for each column in a given schema
1342#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1343pub fn get_column_writers(
1344    parquet: &SchemaDescriptor,
1345    props: &WriterPropertiesPtr,
1346    arrow: &SchemaRef,
1347) -> Result<Vec<ArrowColumnWriter>> {
1348    let mut writers = Vec::with_capacity(arrow.fields.len());
1349    let mut leaves = parquet.columns().iter();
1350    let column_factory = ArrowColumnWriterFactory::new();
1351    for field in &arrow.fields {
1352        column_factory.get_arrow_column_writer(
1353            field.data_type(),
1354            props,
1355            &mut leaves,
1356            &mut writers,
1357        )?;
1358    }
1359    Ok(writers)
1360}
1361
1362/// Creates [`ArrowColumnWriter`] instances
1363struct ArrowColumnWriterFactory {
1364    /// Allocates the per-column-chunk [`PageStore`] backing each page writer.
1365    page_store_factory: Arc<dyn PageStoreFactory>,
1366    #[cfg(feature = "encryption")]
1367    row_group_index: usize,
1368    #[cfg(feature = "encryption")]
1369    file_encryptor: Option<Arc<FileEncryptor>>,
1370}
1371
1372impl ArrowColumnWriterFactory {
1373    pub fn new() -> Self {
1374        Self {
1375            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1376            #[cfg(feature = "encryption")]
1377            row_group_index: 0,
1378            #[cfg(feature = "encryption")]
1379            file_encryptor: None,
1380        }
1381    }
1382
1383    /// Use `page_store_factory` to allocate the buffer for each column chunk.
1384    pub fn with_page_store_factory(
1385        mut self,
1386        page_store_factory: Arc<dyn PageStoreFactory>,
1387    ) -> Self {
1388        self.page_store_factory = page_store_factory;
1389        self
1390    }
1391
1392    #[cfg(feature = "encryption")]
1393    pub fn with_file_encryptor(
1394        mut self,
1395        row_group_index: usize,
1396        file_encryptor: Option<Arc<FileEncryptor>>,
1397    ) -> Self {
1398        self.row_group_index = row_group_index;
1399        self.file_encryptor = file_encryptor;
1400        self
1401    }
1402
1403    #[cfg(feature = "encryption")]
1404    fn create_page_writer(
1405        &self,
1406        column_descriptor: &ColumnDescPtr,
1407        column_index: usize,
1408    ) -> Result<Box<ArrowPageWriter>> {
1409        let column_path = column_descriptor.path().string();
1410        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1411            &self.file_encryptor,
1412            self.row_group_index,
1413            column_index,
1414            &column_path,
1415        )?;
1416        let args = PageStoreArgs::new(column_index, column_descriptor);
1417        let store = self.page_store_factory.create(&args)?;
1418        Ok(Box::new(
1419            ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1420        ))
1421    }
1422
1423    #[cfg(not(feature = "encryption"))]
1424    fn create_page_writer(
1425        &self,
1426        column_descriptor: &ColumnDescPtr,
1427        column_index: usize,
1428    ) -> Result<Box<ArrowPageWriter>> {
1429        let args = PageStoreArgs::new(column_index, column_descriptor);
1430        let store = self.page_store_factory.create(&args)?;
1431        Ok(Box::new(ArrowPageWriter::new(store)))
1432    }
1433
1434    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1435    /// output ColumnDesc to `leaves` and the column writers to `out`
1436    fn get_arrow_column_writer(
1437        &self,
1438        data_type: &ArrowDataType,
1439        props: &WriterPropertiesPtr,
1440        leaves: &mut Iter<'_, ColumnDescPtr>,
1441        out: &mut Vec<ArrowColumnWriter>,
1442    ) -> Result<()> {
1443        // Instantiate writers for normal columns
1444        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1445            let page_writer = self.create_page_writer(desc, out.len())?;
1446            let chunk = page_writer.buffer.clone();
1447            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1448            Ok(ArrowColumnWriter {
1449                chunk,
1450                writer: ArrowColumnWriterImpl::Column(writer),
1451            })
1452        };
1453
1454        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1455        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1456            let page_writer = self.create_page_writer(desc, out.len())?;
1457            let chunk = page_writer.buffer.clone();
1458            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1459            Ok(ArrowColumnWriter {
1460                chunk,
1461                writer: ArrowColumnWriterImpl::ByteArray(writer),
1462            })
1463        };
1464
1465        match data_type {
1466            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1467            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1468                out.push(col(leaves.next().unwrap())?)
1469            }
1470            ArrowDataType::LargeBinary
1471            | ArrowDataType::Binary
1472            | ArrowDataType::Utf8
1473            | ArrowDataType::LargeUtf8
1474            | ArrowDataType::BinaryView
1475            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1476            ArrowDataType::List(f)
1477            | ArrowDataType::LargeList(f)
1478            | ArrowDataType::FixedSizeList(f, _)
1479            | ArrowDataType::ListView(f)
1480            | ArrowDataType::LargeListView(f) => {
1481                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1482            }
1483            ArrowDataType::Struct(fields) => {
1484                for field in fields {
1485                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1486                }
1487            }
1488            ArrowDataType::Map(f, _) => match f.data_type() {
1489                ArrowDataType::Struct(f) => {
1490                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1491                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1492                }
1493                _ => unreachable!("invalid map type"),
1494            },
1495            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1496                ArrowDataType::Utf8
1497                | ArrowDataType::LargeUtf8
1498                | ArrowDataType::Binary
1499                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1500                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1501                    out.push(bytes(leaves.next().unwrap())?)
1502                }
1503                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1504                _ => out.push(col(leaves.next().unwrap())?),
1505            },
1506            ArrowDataType::RunEndEncoded(_, value_field) => {
1507                self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1508            }
1509            _ => {
1510                return Err(ParquetError::NYI(format!(
1511                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1512                )));
1513            }
1514        }
1515        Ok(())
1516    }
1517}
1518
1519fn write_leaf(
1520    writer: &mut ColumnWriter<'_>,
1521    column: &dyn arrow_array::Array,
1522    levels: &ArrayLevels,
1523) -> Result<usize> {
1524    let indices = levels.non_null_indices();
1525
1526    match writer {
1527        // Note: this should match the contents of arrow_to_parquet_type
1528        ColumnWriter::Int32ColumnWriter(typed) => {
1529            match column.data_type() {
1530                ArrowDataType::Null => {
1531                    let array = Int32Array::new_null(column.len());
1532                    write_primitive(typed, array.values(), levels)
1533                }
1534                ArrowDataType::Int8 => {
1535                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1536                    write_primitive(typed, array.values(), levels)
1537                }
1538                ArrowDataType::Int16 => {
1539                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1540                    write_primitive(typed, array.values(), levels)
1541                }
1542                ArrowDataType::Int32 => {
1543                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1544                }
1545                ArrowDataType::UInt8 => {
1546                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1547                    write_primitive(typed, array.values(), levels)
1548                }
1549                ArrowDataType::UInt16 => {
1550                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1551                    write_primitive(typed, array.values(), levels)
1552                }
1553                ArrowDataType::UInt32 => {
1554                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1555                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1556                    let array = column.as_primitive::<UInt32Type>();
1557                    write_primitive(typed, array.values().inner().typed_data(), levels)
1558                }
1559                ArrowDataType::Date32 => {
1560                    let array = column.as_primitive::<Date32Type>();
1561                    write_primitive(typed, array.values(), levels)
1562                }
1563                ArrowDataType::Time32(TimeUnit::Second) => {
1564                    let array = column.as_primitive::<Time32SecondType>();
1565                    write_primitive(typed, array.values(), levels)
1566                }
1567                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1568                    let array = column.as_primitive::<Time32MillisecondType>();
1569                    write_primitive(typed, array.values(), levels)
1570                }
1571                ArrowDataType::Date64 => {
1572                    // If the column is a Date64, we truncate it
1573                    let array: Int32Array = column
1574                        .as_primitive::<Date64Type>()
1575                        .unary(|x| (x / 86_400_000) as _);
1576
1577                    write_primitive(typed, array.values(), levels)
1578                }
1579                ArrowDataType::Decimal32(_, _) => {
1580                    let array = column
1581                        .as_primitive::<Decimal32Type>()
1582                        .unary::<_, Int32Type>(|v| v);
1583                    write_primitive(typed, array.values(), levels)
1584                }
1585                ArrowDataType::Decimal64(_, _) => {
1586                    // use the int32 to represent the decimal with low precision
1587                    let array = column
1588                        .as_primitive::<Decimal64Type>()
1589                        .unary::<_, Int32Type>(|v| v as i32);
1590                    write_primitive(typed, array.values(), levels)
1591                }
1592                ArrowDataType::Decimal128(_, _) => {
1593                    // use the int32 to represent the decimal with low precision
1594                    let array = column
1595                        .as_primitive::<Decimal128Type>()
1596                        .unary::<_, Int32Type>(|v| v as i32);
1597                    write_primitive(typed, array.values(), levels)
1598                }
1599                ArrowDataType::Decimal256(_, _) => {
1600                    // use the int32 to represent the decimal with low precision
1601                    let array = column
1602                        .as_primitive::<Decimal256Type>()
1603                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1604                    write_primitive(typed, array.values(), levels)
1605                }
1606                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1607            }
1608        }
1609        ColumnWriter::BoolColumnWriter(typed) => {
1610            let array = column.as_boolean();
1611            let values = get_bool_array_slice(array, indices.iter().copied());
1612            typed.write_batch_internal(
1613                values.as_slice(),
1614                None,
1615                levels.def_level_data().as_ref(),
1616                levels.rep_level_data().as_ref(),
1617                None,
1618                None,
1619                None,
1620            )
1621        }
1622        ColumnWriter::Int64ColumnWriter(typed) => {
1623            match column.data_type() {
1624                ArrowDataType::Date64 => {
1625                    let array = column
1626                        .as_primitive::<Date64Type>()
1627                        .reinterpret_cast::<Int64Type>();
1628
1629                    write_primitive(typed, array.values(), levels)
1630                }
1631                ArrowDataType::Int64 => {
1632                    let array = column.as_primitive::<Int64Type>();
1633                    write_primitive(typed, array.values(), levels)
1634                }
1635                ArrowDataType::UInt64 => {
1636                    let values = column.as_primitive::<UInt64Type>().values();
1637                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1638                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1639                    let array = values.inner().typed_data::<i64>();
1640                    write_primitive(typed, array, levels)
1641                }
1642                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1643                    let array = column.as_primitive::<Time64MicrosecondType>();
1644                    write_primitive(typed, array.values(), levels)
1645                }
1646                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1647                    let array = column.as_primitive::<Time64NanosecondType>();
1648                    write_primitive(typed, array.values(), levels)
1649                }
1650                ArrowDataType::Timestamp(unit, _) => match unit {
1651                    TimeUnit::Second => {
1652                        let array = column.as_primitive::<TimestampSecondType>();
1653                        write_primitive(typed, array.values(), levels)
1654                    }
1655                    TimeUnit::Millisecond => {
1656                        let array = column.as_primitive::<TimestampMillisecondType>();
1657                        write_primitive(typed, array.values(), levels)
1658                    }
1659                    TimeUnit::Microsecond => {
1660                        let array = column.as_primitive::<TimestampMicrosecondType>();
1661                        write_primitive(typed, array.values(), levels)
1662                    }
1663                    TimeUnit::Nanosecond => {
1664                        let array = column.as_primitive::<TimestampNanosecondType>();
1665                        write_primitive(typed, array.values(), levels)
1666                    }
1667                },
1668                ArrowDataType::Duration(unit) => match unit {
1669                    TimeUnit::Second => {
1670                        let array = column.as_primitive::<DurationSecondType>();
1671                        write_primitive(typed, array.values(), levels)
1672                    }
1673                    TimeUnit::Millisecond => {
1674                        let array = column.as_primitive::<DurationMillisecondType>();
1675                        write_primitive(typed, array.values(), levels)
1676                    }
1677                    TimeUnit::Microsecond => {
1678                        let array = column.as_primitive::<DurationMicrosecondType>();
1679                        write_primitive(typed, array.values(), levels)
1680                    }
1681                    TimeUnit::Nanosecond => {
1682                        let array = column.as_primitive::<DurationNanosecondType>();
1683                        write_primitive(typed, array.values(), levels)
1684                    }
1685                },
1686                ArrowDataType::Decimal64(_, _) => {
1687                    let array = column
1688                        .as_primitive::<Decimal64Type>()
1689                        .reinterpret_cast::<Int64Type>();
1690                    write_primitive(typed, array.values(), levels)
1691                }
1692                ArrowDataType::Decimal128(_, _) => {
1693                    // use the int64 to represent the decimal with low precision
1694                    let array = column
1695                        .as_primitive::<Decimal128Type>()
1696                        .unary::<_, Int64Type>(|v| v as i64);
1697                    write_primitive(typed, array.values(), levels)
1698                }
1699                ArrowDataType::Decimal256(_, _) => {
1700                    // use the int64 to represent the decimal with low precision
1701                    let array = column
1702                        .as_primitive::<Decimal256Type>()
1703                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1704                    write_primitive(typed, array.values(), levels)
1705                }
1706                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1707            }
1708        }
1709        ColumnWriter::Int96ColumnWriter(_typed) => {
1710            unreachable!("Currently unreachable because data type not supported")
1711        }
1712        ColumnWriter::FloatColumnWriter(typed) => {
1713            let array = column.as_primitive::<Float32Type>();
1714            write_primitive(typed, array.values(), levels)
1715        }
1716        ColumnWriter::DoubleColumnWriter(typed) => {
1717            let array = column.as_primitive::<Float64Type>();
1718            write_primitive(typed, array.values(), levels)
1719        }
1720        ColumnWriter::ByteArrayColumnWriter(_) => {
1721            unreachable!("should use ByteArrayWriter")
1722        }
1723        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1724            let bytes = match column.data_type() {
1725                ArrowDataType::Interval(interval_unit) => match interval_unit {
1726                    IntervalUnit::YearMonth => {
1727                        let array = column.as_primitive::<IntervalYearMonthType>();
1728                        get_interval_ym_array_slice(array, indices.iter().copied())
1729                    }
1730                    IntervalUnit::DayTime => {
1731                        let array = column.as_primitive::<IntervalDayTimeType>();
1732                        get_interval_dt_array_slice(array, indices.iter().copied())
1733                    }
1734                    _ => {
1735                        return Err(ParquetError::NYI(format!(
1736                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1737                        )));
1738                    }
1739                },
1740                ArrowDataType::FixedSizeBinary(_) => {
1741                    let array = column.as_fixed_size_binary();
1742                    get_fsb_array_slice(array, indices.iter().copied())
1743                }
1744                ArrowDataType::Decimal32(_, _) => {
1745                    let array = column.as_primitive::<Decimal32Type>();
1746                    get_decimal_32_array_slice(array, indices.iter().copied())
1747                }
1748                ArrowDataType::Decimal64(_, _) => {
1749                    let array = column.as_primitive::<Decimal64Type>();
1750                    get_decimal_64_array_slice(array, indices.iter().copied())
1751                }
1752                ArrowDataType::Decimal128(_, _) => {
1753                    let array = column.as_primitive::<Decimal128Type>();
1754                    get_decimal_128_array_slice(array, indices.iter().copied())
1755                }
1756                ArrowDataType::Decimal256(_, _) => {
1757                    let array = column.as_primitive::<Decimal256Type>();
1758                    get_decimal_256_array_slice(array, indices.iter().copied())
1759                }
1760                ArrowDataType::Float16 => {
1761                    let array = column.as_primitive::<Float16Type>();
1762                    get_float_16_array_slice(array, indices.iter().copied())
1763                }
1764                _ => {
1765                    return Err(ParquetError::NYI(
1766                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1767                    ));
1768                }
1769            };
1770            typed.write_batch_internal(
1771                bytes.as_slice(),
1772                None,
1773                levels.def_level_data().as_ref(),
1774                levels.rep_level_data().as_ref(),
1775                None,
1776                None,
1777                None,
1778            )
1779        }
1780    }
1781}
1782
1783fn write_primitive<E: ColumnValueEncoder>(
1784    writer: &mut GenericColumnWriter<E>,
1785    values: &E::Values,
1786    levels: &ArrayLevels,
1787) -> Result<usize> {
1788    writer.write_batch_internal(
1789        values,
1790        Some(levels.non_null_indices()),
1791        levels.def_level_data().as_ref(),
1792        levels.rep_level_data().as_ref(),
1793        None,
1794        None,
1795        None,
1796    )
1797}
1798
1799fn get_bool_array_slice(
1800    array: &arrow_array::BooleanArray,
1801    indices: impl ExactSizeIterator<Item = usize>,
1802) -> Vec<bool> {
1803    let mut values = Vec::with_capacity(indices.len());
1804    for i in indices {
1805        values.push(array.value(i))
1806    }
1807    values
1808}
1809
1810/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1811/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1812fn get_interval_ym_array_slice(
1813    array: &arrow_array::IntervalYearMonthArray,
1814    indices: impl ExactSizeIterator<Item = usize>,
1815) -> Vec<FixedLenByteArray> {
1816    let mut values = Vec::with_capacity(indices.len());
1817    for i in indices {
1818        let mut value = array.value(i).to_le_bytes().to_vec();
1819        let mut suffix = vec![0; 8];
1820        value.append(&mut suffix);
1821        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1822    }
1823    values
1824}
1825
1826/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1827/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1828fn get_interval_dt_array_slice(
1829    array: &arrow_array::IntervalDayTimeArray,
1830    indices: impl ExactSizeIterator<Item = usize>,
1831) -> Vec<FixedLenByteArray> {
1832    let mut values = Vec::with_capacity(indices.len());
1833    for i in indices {
1834        let mut out = [0; 12];
1835        let value = array.value(i);
1836        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1837        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1838        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1839    }
1840    values
1841}
1842
1843fn get_decimal_32_array_slice(
1844    array: &arrow_array::Decimal32Array,
1845    indices: impl ExactSizeIterator<Item = usize>,
1846) -> Vec<FixedLenByteArray> {
1847    let mut values = Vec::with_capacity(indices.len());
1848    let size = decimal_length_from_precision(array.precision());
1849    for i in indices {
1850        let as_be_bytes = array.value(i).to_be_bytes();
1851        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1852        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1853    }
1854    values
1855}
1856
1857fn get_decimal_64_array_slice(
1858    array: &arrow_array::Decimal64Array,
1859    indices: impl ExactSizeIterator<Item = usize>,
1860) -> Vec<FixedLenByteArray> {
1861    let mut values = Vec::with_capacity(indices.len());
1862    let size = decimal_length_from_precision(array.precision());
1863    for i in indices {
1864        let as_be_bytes = array.value(i).to_be_bytes();
1865        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1866        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1867    }
1868    values
1869}
1870
1871fn get_decimal_128_array_slice(
1872    array: &arrow_array::Decimal128Array,
1873    indices: impl ExactSizeIterator<Item = usize>,
1874) -> Vec<FixedLenByteArray> {
1875    let mut values = Vec::with_capacity(indices.len());
1876    let size = decimal_length_from_precision(array.precision());
1877    for i in indices {
1878        let as_be_bytes = array.value(i).to_be_bytes();
1879        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1880        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1881    }
1882    values
1883}
1884
1885fn get_decimal_256_array_slice(
1886    array: &arrow_array::Decimal256Array,
1887    indices: impl ExactSizeIterator<Item = usize>,
1888) -> Vec<FixedLenByteArray> {
1889    let mut values = Vec::with_capacity(indices.len());
1890    let size = decimal_length_from_precision(array.precision());
1891    for i in indices {
1892        let as_be_bytes = array.value(i).to_be_bytes();
1893        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1894        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1895    }
1896    values
1897}
1898
1899fn get_float_16_array_slice(
1900    array: &arrow_array::Float16Array,
1901    indices: impl ExactSizeIterator<Item = usize>,
1902) -> Vec<FixedLenByteArray> {
1903    let mut values = Vec::with_capacity(indices.len());
1904    for i in indices {
1905        let value = array.value(i).to_le_bytes().to_vec();
1906        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1907    }
1908    values
1909}
1910
1911fn get_fsb_array_slice(
1912    array: &arrow_array::FixedSizeBinaryArray,
1913    indices: impl ExactSizeIterator<Item = usize>,
1914) -> Vec<FixedLenByteArray> {
1915    let mut values = Vec::with_capacity(indices.len());
1916    for i in indices {
1917        let value = array.value(i).to_vec();
1918        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1919    }
1920    values
1921}
1922
1923#[cfg(test)]
1924mod tests {
1925    use super::*;
1926    use std::collections::HashMap;
1927
1928    use std::fs::File;
1929
1930    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1931    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1932    use crate::column::page::{Page, PageReader};
1933    use crate::file::metadata::thrift::PageHeader;
1934    use crate::file::page_index::column_index::ColumnIndexMetaData;
1935    use crate::file::reader::SerializedPageReader;
1936    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1937    use crate::schema::types::ColumnPath;
1938    use arrow::datatypes::ToByteSlice;
1939    use arrow::datatypes::{DataType, Schema};
1940    use arrow::error::Result as ArrowResult;
1941    use arrow::util::data_gen::create_random_array;
1942    use arrow::util::pretty::pretty_format_batches;
1943    use arrow::{array::*, buffer::Buffer};
1944    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1945    use arrow_schema::Fields;
1946    use half::f16;
1947    use num_traits::{FromPrimitive, ToPrimitive};
1948    use tempfile::tempfile;
1949
1950    use crate::basic::Encoding;
1951    use crate::data_type::AsBytes;
1952    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1953    use crate::file::properties::{
1954        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1955    };
1956    use crate::file::serialized_reader::ReadOptionsBuilder;
1957    use crate::file::{
1958        reader::{FileReader, SerializedFileReader},
1959        statistics::Statistics,
1960    };
1961
1962    /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and keeps
1963    /// blobs in a `HashMap` — nothing like the default `Vec<Bytes>`. Used to
1964    /// prove the writer relies only on the opaque-handle contract and never on
1965    /// handles being dense `Vec` indices. Records how many blobs were stored.
1966    #[derive(Debug, Default)]
1967    struct RecordingPageStore {
1968        next: u64,
1969        blobs: HashMap<u64, Bytes>,
1970        puts: Arc<std::sync::atomic::AtomicUsize>,
1971    }
1972
1973    impl PageStore for RecordingPageStore {
1974        fn put(&mut self, value: Bytes) -> Result<PageKey> {
1975            // Deliberately non-sequential, never-zero handles.
1976            let id = 100 + self.next * 7;
1977            self.next += 1;
1978            self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1979            self.blobs.insert(id, value);
1980            Ok(PageKey::new(id))
1981        }
1982
1983        fn take(&mut self, key: PageKey) -> Result<Bytes> {
1984            self.blobs
1985                .remove(&key.get())
1986                .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
1987        }
1988    }
1989
1990    #[derive(Debug)]
1991    struct RecordingPageStoreFactory {
1992        puts: Arc<std::sync::atomic::AtomicUsize>,
1993    }
1994
1995    impl PageStoreFactory for RecordingPageStoreFactory {
1996        fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
1997            Ok(Box::new(RecordingPageStore {
1998                puts: self.puts.clone(),
1999                ..Default::default()
2000            }))
2001        }
2002    }
2003
2004    /// A custom [`PageStore`] must produce byte-identical files to the in-memory
2005    /// default, across dictionary and non-dictionary columns and multiple row
2006    /// groups (so multiple store instances are exercised).
2007    #[test]
2008    fn custom_page_store_is_byte_identical_to_default() {
2009        let schema = Arc::new(Schema::new(vec![
2010            Field::new("i", DataType::Int32, true),
2011            // A low-cardinality string column to exercise the dictionary path.
2012            Field::new("s", DataType::Utf8, true),
2013        ]));
2014        let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2015        let s = StringArray::from(vec![
2016            Some("a"),
2017            Some("bb"),
2018            Some("a"),
2019            None,
2020            Some("bb"),
2021            Some("ccc"),
2022        ]);
2023        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2024
2025        // Small row groups so multiple column chunks (hence multiple store
2026        // instances) are produced.
2027        let props = WriterProperties::builder()
2028            .set_max_row_group_row_count(Some(3))
2029            .build();
2030
2031        let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2032            let mut buffer = Vec::new();
2033            let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2034            if let Some(factory) = factory {
2035                opts = opts.with_page_store_factory(factory);
2036            }
2037            let mut writer =
2038                ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2039            writer.write(&batch).unwrap();
2040            writer.close().unwrap();
2041            buffer
2042        };
2043
2044        let default_bytes = write(None);
2045
2046        let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2047        let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2048            puts: puts.clone(),
2049        })));
2050
2051        assert!(
2052            puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2053            "custom PageStore was never written to"
2054        );
2055        assert_eq!(
2056            default_bytes, custom_bytes,
2057            "a custom PageStore must produce byte-identical output to the default"
2058        );
2059    }
2060
2061    /// A dictionary-encoded column written through the deferred-ordering Arrow
2062    /// path must round-trip correctly even with the offset index disabled, when
2063    /// only the chunk-level dictionary/data page offsets are rewritten (there is
2064    /// no offset index to rebuild). Spans multiple data pages so the
2065    /// dictionary-first reordering is exercised.
2066    #[test]
2067    fn dictionary_column_round_trips_with_offset_index_disabled() {
2068        let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2069
2070        // Low cardinality so the column stays dictionary-encoded; enough rows to
2071        // span several data pages within a single row group.
2072        let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2073        let array = Int32Array::from(values.clone());
2074        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2075
2076        let props = WriterProperties::builder()
2077            .set_offset_index_disabled(true)
2078            .set_data_page_row_count_limit(4096)
2079            .build();
2080        let opts = ArrowWriterOptions::new().with_properties(props);
2081
2082        let mut buffer = Vec::new();
2083        let mut writer =
2084            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2085        writer.write(&batch).unwrap();
2086        writer.close().unwrap();
2087
2088        let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2089        let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2090        let read_values: Vec<Option<i32>> = read
2091            .iter()
2092            .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2093            .collect();
2094        assert_eq!(read_values, values);
2095    }
2096
2097    #[test]
2098    fn arrow_writer() {
2099        // define schema
2100        let schema = Schema::new(vec![
2101            Field::new("a", DataType::Int32, false),
2102            Field::new("b", DataType::Int32, true),
2103        ]);
2104
2105        // create some data
2106        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2107        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2108
2109        // build a record batch
2110        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2111
2112        roundtrip(batch, Some(SMALL_SIZE / 2));
2113    }
2114
2115    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2116        let mut buffer = vec![];
2117
2118        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2119        writer.write(expected_batch).unwrap();
2120        writer.close().unwrap();
2121
2122        buffer
2123    }
2124
2125    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2126        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2127        writer.write(expected_batch).unwrap();
2128        writer.into_inner().unwrap()
2129    }
2130
2131    #[test]
2132    fn roundtrip_bytes() {
2133        // define schema
2134        let schema = Arc::new(Schema::new(vec![
2135            Field::new("a", DataType::Int32, false),
2136            Field::new("b", DataType::Int32, true),
2137        ]));
2138
2139        // create some data
2140        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2141        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2142
2143        // build a record batch
2144        let expected_batch =
2145            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2146
2147        for buffer in [
2148            get_bytes_after_close(schema.clone(), &expected_batch),
2149            get_bytes_by_into_inner(schema, &expected_batch),
2150        ] {
2151            let cursor = Bytes::from(buffer);
2152            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2153
2154            let actual_batch = record_batch_reader
2155                .next()
2156                .expect("No batch found")
2157                .expect("Unable to get batch");
2158
2159            assert_eq!(expected_batch.schema(), actual_batch.schema());
2160            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2161            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2162            for i in 0..expected_batch.num_columns() {
2163                let expected_data = expected_batch.column(i).to_data();
2164                let actual_data = actual_batch.column(i).to_data();
2165
2166                assert_eq!(expected_data, actual_data);
2167            }
2168        }
2169    }
2170
2171    #[test]
2172    fn arrow_writer_non_null() {
2173        // define schema
2174        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2175
2176        // create some data
2177        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2178
2179        // build a record batch
2180        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2181
2182        roundtrip(batch, Some(SMALL_SIZE / 2));
2183    }
2184
2185    #[test]
2186    fn arrow_writer_list() {
2187        // define schema
2188        let schema = Schema::new(vec![Field::new(
2189            "a",
2190            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2191            true,
2192        )]);
2193
2194        // create some data
2195        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2196
2197        // Construct a buffer for value offsets, for the nested array:
2198        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2199        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2200
2201        // Construct a list array from the above two
2202        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2203            DataType::Int32,
2204            false,
2205        ))))
2206        .len(5)
2207        .add_buffer(a_value_offsets)
2208        .add_child_data(a_values.into_data())
2209        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2210        .build()
2211        .unwrap();
2212        let a = ListArray::from(a_list_data);
2213
2214        // build a record batch
2215        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2216
2217        assert_eq!(batch.column(0).null_count(), 1);
2218
2219        // This test fails if the max row group size is less than the batch's length
2220        // see https://github.com/apache/arrow-rs/issues/518
2221        roundtrip(batch, None);
2222    }
2223
2224    #[test]
2225    fn arrow_writer_list_non_null() {
2226        // define schema
2227        let schema = Schema::new(vec![Field::new(
2228            "a",
2229            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2230            false,
2231        )]);
2232
2233        // create some data
2234        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2235
2236        // Construct a buffer for value offsets, for the nested array:
2237        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2238        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2239
2240        // Construct a list array from the above two
2241        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2242            DataType::Int32,
2243            false,
2244        ))))
2245        .len(5)
2246        .add_buffer(a_value_offsets)
2247        .add_child_data(a_values.into_data())
2248        .build()
2249        .unwrap();
2250        let a = ListArray::from(a_list_data);
2251
2252        // build a record batch
2253        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2254
2255        // This test fails if the max row group size is less than the batch's length
2256        // see https://github.com/apache/arrow-rs/issues/518
2257        assert_eq!(batch.column(0).null_count(), 0);
2258
2259        roundtrip(batch, None);
2260    }
2261
2262    #[test]
2263    fn arrow_writer_list_view() {
2264        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2265        let schema = Schema::new(vec![Field::new(
2266            "a",
2267            DataType::ListView(list_field.clone()),
2268            true,
2269        )]);
2270
2271        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2272        let a = ListViewArray::new(
2273            list_field,
2274            vec![0, 1, 0, 3, 6].into(),
2275            vec![1, 2, 0, 3, 4].into(),
2276            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2277            Some(vec![true, true, false, true, true].into()),
2278        );
2279
2280        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2281
2282        assert_eq!(batch.column(0).null_count(), 1);
2283
2284        roundtrip(batch, None);
2285    }
2286
2287    #[test]
2288    fn arrow_writer_list_view_non_null() {
2289        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2290        let schema = Schema::new(vec![Field::new(
2291            "a",
2292            DataType::ListView(list_field.clone()),
2293            false,
2294        )]);
2295
2296        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2297        let a = ListViewArray::new(
2298            list_field,
2299            vec![0, 1, 0, 3, 6].into(),
2300            vec![1, 2, 0, 3, 4].into(),
2301            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2302            None,
2303        );
2304
2305        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2306
2307        assert_eq!(batch.column(0).null_count(), 0);
2308
2309        roundtrip(batch, None);
2310    }
2311
2312    #[test]
2313    fn arrow_writer_list_view_out_of_order() {
2314        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2315        let schema = Schema::new(vec![Field::new(
2316            "a",
2317            DataType::ListView(list_field.clone()),
2318            false,
2319        )]);
2320
2321        // [[1], [2, 3], [], [7, 8, 9, 10], [4, 5, 6]] - out of order offsets
2322        let a = ListViewArray::new(
2323            list_field,
2324            vec![0, 1, 0, 6, 3].into(),
2325            vec![1, 2, 0, 4, 3].into(),
2326            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2327            None,
2328        );
2329
2330        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2331
2332        roundtrip(batch, None);
2333    }
2334
2335    #[test]
2336    fn arrow_writer_large_list_view() {
2337        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2338        let schema = Schema::new(vec![Field::new(
2339            "a",
2340            DataType::LargeListView(list_field.clone()),
2341            true,
2342        )]);
2343
2344        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2345        let a = LargeListViewArray::new(
2346            list_field,
2347            vec![0i64, 1, 0, 3, 6].into(),
2348            vec![1i64, 2, 0, 3, 4].into(),
2349            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2350            Some(vec![true, true, false, true, true].into()),
2351        );
2352
2353        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2354
2355        assert_eq!(batch.column(0).null_count(), 1);
2356
2357        roundtrip(batch, None);
2358    }
2359
2360    #[test]
2361    fn arrow_writer_list_view_with_struct() {
2362        // Test ListView containing Struct: ListView<Struct<Int32, Utf8>>
2363        let struct_fields = Fields::from(vec![
2364            Field::new("id", DataType::Int32, false),
2365            Field::new("name", DataType::Utf8, false),
2366        ]);
2367        let struct_type = DataType::Struct(struct_fields.clone());
2368        let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2369
2370        let schema = Schema::new(vec![Field::new(
2371            "a",
2372            DataType::ListView(list_field.clone()),
2373            true,
2374        )]);
2375
2376        // Create struct values
2377        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2378        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2379        let struct_array = StructArray::new(
2380            struct_fields,
2381            vec![Arc::new(id_array), Arc::new(name_array)],
2382            None,
2383        );
2384
2385        // Create ListView: [{1, "a"}, {2, "b"}], null, [{3, "c"}, {4, "d"}, {5, "e"}]
2386        let list_view = ListViewArray::new(
2387            list_field,
2388            vec![0, 2, 2].into(), // offsets
2389            vec![2, 0, 3].into(), // sizes
2390            Arc::new(struct_array),
2391            Some(vec![true, false, true].into()),
2392        );
2393
2394        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2395
2396        roundtrip(batch, None);
2397    }
2398
2399    #[test]
2400    fn arrow_writer_binary() {
2401        let string_field = Field::new("a", DataType::Utf8, false);
2402        let binary_field = Field::new("b", DataType::Binary, false);
2403        let schema = Schema::new(vec![string_field, binary_field]);
2404
2405        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2406        let raw_binary_values = [
2407            b"foo".to_vec(),
2408            b"bar".to_vec(),
2409            b"baz".to_vec(),
2410            b"quux".to_vec(),
2411        ];
2412        let raw_binary_value_refs = raw_binary_values
2413            .iter()
2414            .map(|x| x.as_slice())
2415            .collect::<Vec<_>>();
2416
2417        let string_values = StringArray::from(raw_string_values.clone());
2418        let binary_values = BinaryArray::from(raw_binary_value_refs);
2419        let batch = RecordBatch::try_new(
2420            Arc::new(schema),
2421            vec![Arc::new(string_values), Arc::new(binary_values)],
2422        )
2423        .unwrap();
2424
2425        roundtrip(batch, Some(SMALL_SIZE / 2));
2426    }
2427
2428    #[test]
2429    fn arrow_writer_binary_view() {
2430        let string_field = Field::new("a", DataType::Utf8View, false);
2431        let binary_field = Field::new("b", DataType::BinaryView, false);
2432        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2433        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2434
2435        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2436        let raw_binary_values = vec![
2437            b"foo".to_vec(),
2438            b"bar".to_vec(),
2439            b"large payload over 12 bytes".to_vec(),
2440            b"lulu".to_vec(),
2441        ];
2442        let nullable_string_values =
2443            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2444
2445        let string_view_values = StringViewArray::from(raw_string_values);
2446        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2447        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2448        let batch = RecordBatch::try_new(
2449            Arc::new(schema),
2450            vec![
2451                Arc::new(string_view_values),
2452                Arc::new(binary_view_values),
2453                Arc::new(nullable_string_view_values),
2454            ],
2455        )
2456        .unwrap();
2457
2458        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2459        roundtrip(batch, None);
2460    }
2461
2462    #[test]
2463    fn arrow_writer_binary_view_long_value() {
2464        let string_field = Field::new("a", DataType::Utf8View, false);
2465        let binary_field = Field::new("b", DataType::BinaryView, false);
2466        let schema = Schema::new(vec![string_field, binary_field]);
2467
2468        // There is special case validation for long values (greater than 128)
2469        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
2470        // trigger the long-string UTF-8 validation branch in the plain decoder.
2471        let long = "a".repeat(128);
2472        let raw_string_values = vec!["foo", long.as_str(), "bar"];
2473        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2474
2475        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2476        let binary_view_values: ArrayRef =
2477            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2478
2479        one_column_roundtrip(Arc::clone(&string_view_values), false);
2480        one_column_roundtrip(Arc::clone(&binary_view_values), false);
2481
2482        let batch = RecordBatch::try_new(
2483            Arc::new(schema),
2484            vec![string_view_values, binary_view_values],
2485        )
2486        .unwrap();
2487
2488        // Disable dictionary to exercise plain encoding paths in the reader.
2489        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2490            let props = WriterProperties::builder()
2491                .set_writer_version(version)
2492                .set_dictionary_enabled(false)
2493                .build();
2494            roundtrip_opts(&batch, props);
2495        }
2496    }
2497
2498    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2499        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2500        let schema = Schema::new(vec![decimal_field]);
2501
2502        let decimal_values = vec![10_000, 50_000, 0, -100]
2503            .into_iter()
2504            .map(Some)
2505            .collect::<Decimal128Array>()
2506            .with_precision_and_scale(precision, scale)
2507            .unwrap();
2508
2509        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2510    }
2511
2512    #[test]
2513    fn arrow_writer_decimal() {
2514        // int32 to store the decimal value
2515        let batch_int32_decimal = get_decimal_batch(5, 2);
2516        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2517        // int64 to store the decimal value
2518        let batch_int64_decimal = get_decimal_batch(12, 2);
2519        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2520        // fixed_length_byte_array to store the decimal value
2521        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2522        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2523    }
2524
2525    #[test]
2526    fn arrow_writer_complex() {
2527        // define schema
2528        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2529        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2530        let struct_field_g = Arc::new(Field::new_list(
2531            "g",
2532            Field::new_list_field(DataType::Int16, true),
2533            false,
2534        ));
2535        let struct_field_h = Arc::new(Field::new_list(
2536            "h",
2537            Field::new_list_field(DataType::Int16, false),
2538            true,
2539        ));
2540        let struct_field_e = Arc::new(Field::new_struct(
2541            "e",
2542            vec![
2543                struct_field_f.clone(),
2544                struct_field_g.clone(),
2545                struct_field_h.clone(),
2546            ],
2547            false,
2548        ));
2549        let schema = Schema::new(vec![
2550            Field::new("a", DataType::Int32, false),
2551            Field::new("b", DataType::Int32, true),
2552            Field::new_struct(
2553                "c",
2554                vec![struct_field_d.clone(), struct_field_e.clone()],
2555                false,
2556            ),
2557        ]);
2558
2559        // create some data
2560        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2561        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2562        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2563        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2564
2565        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2566
2567        // Construct a buffer for value offsets, for the nested array:
2568        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2569        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2570
2571        // Construct a list array from the above two
2572        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2573            .len(5)
2574            .add_buffer(g_value_offsets.clone())
2575            .add_child_data(g_value.to_data())
2576            .build()
2577            .unwrap();
2578        let g = ListArray::from(g_list_data);
2579        // The difference between g and h is that h has a null bitmap
2580        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2581            .len(5)
2582            .add_buffer(g_value_offsets)
2583            .add_child_data(g_value.to_data())
2584            .null_bit_buffer(Some(Buffer::from([0b00011011])))
2585            .build()
2586            .unwrap();
2587        let h = ListArray::from(h_list_data);
2588
2589        let e = StructArray::from(vec![
2590            (struct_field_f, Arc::new(f) as ArrayRef),
2591            (struct_field_g, Arc::new(g) as ArrayRef),
2592            (struct_field_h, Arc::new(h) as ArrayRef),
2593        ]);
2594
2595        let c = StructArray::from(vec![
2596            (struct_field_d, Arc::new(d) as ArrayRef),
2597            (struct_field_e, Arc::new(e) as ArrayRef),
2598        ]);
2599
2600        // build a record batch
2601        let batch = RecordBatch::try_new(
2602            Arc::new(schema),
2603            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2604        )
2605        .unwrap();
2606
2607        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2608        roundtrip(batch, Some(SMALL_SIZE / 3));
2609    }
2610
2611    #[test]
2612    fn arrow_writer_complex_mixed() {
2613        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
2614        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
2615
2616        // define schema
2617        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2618        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2619        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2620        let schema = Schema::new(vec![Field::new(
2621            "some_nested_object",
2622            DataType::Struct(Fields::from(vec![
2623                offset_field.clone(),
2624                partition_field.clone(),
2625                topic_field.clone(),
2626            ])),
2627            false,
2628        )]);
2629
2630        // create some data
2631        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2632        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2633        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2634
2635        let some_nested_object = StructArray::from(vec![
2636            (offset_field, Arc::new(offset) as ArrayRef),
2637            (partition_field, Arc::new(partition) as ArrayRef),
2638            (topic_field, Arc::new(topic) as ArrayRef),
2639        ]);
2640
2641        // build a record batch
2642        let batch =
2643            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2644
2645        roundtrip(batch, Some(SMALL_SIZE / 2));
2646    }
2647
2648    #[test]
2649    fn arrow_writer_map() {
2650        // Note: we are using the JSON Arrow reader for brevity
2651        let json_content = r#"
2652        {"stocks":{"long": "$AAA", "short": "$BBB"}}
2653        {"stocks":{"long": null, "long": "$CCC", "short": null}}
2654        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2655        "#;
2656        let entries_struct_type = DataType::Struct(Fields::from(vec![
2657            Field::new("key", DataType::Utf8, false),
2658            Field::new("value", DataType::Utf8, true),
2659        ]));
2660        let stocks_field = Field::new(
2661            "stocks",
2662            DataType::Map(
2663                Arc::new(Field::new("entries", entries_struct_type, false)),
2664                false,
2665            ),
2666            true,
2667        );
2668        let schema = Arc::new(Schema::new(vec![stocks_field]));
2669        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2670        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2671
2672        let batch = reader.next().unwrap().unwrap();
2673        roundtrip(batch, None);
2674    }
2675
2676    #[test]
2677    fn arrow_writer_2_level_struct() {
2678        // tests writing <struct<struct<primitive>>
2679        let field_c = Field::new("c", DataType::Int32, true);
2680        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2681        let type_a = DataType::Struct(vec![field_b.clone()].into());
2682        let field_a = Field::new("a", type_a, true);
2683        let schema = Schema::new(vec![field_a.clone()]);
2684
2685        // create data
2686        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2687        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2688            .len(6)
2689            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2690            .add_child_data(c.into_data())
2691            .build()
2692            .unwrap();
2693        let b = StructArray::from(b_data);
2694        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2695            .len(6)
2696            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2697            .add_child_data(b.into_data())
2698            .build()
2699            .unwrap();
2700        let a = StructArray::from(a_data);
2701
2702        assert_eq!(a.null_count(), 1);
2703        assert_eq!(a.column(0).null_count(), 2);
2704
2705        // build a racord batch
2706        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2707
2708        roundtrip(batch, Some(SMALL_SIZE / 2));
2709    }
2710
2711    #[test]
2712    fn arrow_writer_2_level_struct_non_null() {
2713        // tests writing <struct<struct<primitive>>
2714        let field_c = Field::new("c", DataType::Int32, false);
2715        let type_b = DataType::Struct(vec![field_c].into());
2716        let field_b = Field::new("b", type_b.clone(), false);
2717        let type_a = DataType::Struct(vec![field_b].into());
2718        let field_a = Field::new("a", type_a.clone(), false);
2719        let schema = Schema::new(vec![field_a]);
2720
2721        // create data
2722        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2723        let b_data = ArrayDataBuilder::new(type_b)
2724            .len(6)
2725            .add_child_data(c.into_data())
2726            .build()
2727            .unwrap();
2728        let b = StructArray::from(b_data);
2729        let a_data = ArrayDataBuilder::new(type_a)
2730            .len(6)
2731            .add_child_data(b.into_data())
2732            .build()
2733            .unwrap();
2734        let a = StructArray::from(a_data);
2735
2736        assert_eq!(a.null_count(), 0);
2737        assert_eq!(a.column(0).null_count(), 0);
2738
2739        // build a racord batch
2740        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2741
2742        roundtrip(batch, Some(SMALL_SIZE / 2));
2743    }
2744
2745    #[test]
2746    fn arrow_writer_2_level_struct_mixed_null() {
2747        // tests writing <struct<struct<primitive>>
2748        let field_c = Field::new("c", DataType::Int32, false);
2749        let type_b = DataType::Struct(vec![field_c].into());
2750        let field_b = Field::new("b", type_b.clone(), true);
2751        let type_a = DataType::Struct(vec![field_b].into());
2752        let field_a = Field::new("a", type_a.clone(), false);
2753        let schema = Schema::new(vec![field_a]);
2754
2755        // create data
2756        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2757        let b_data = ArrayDataBuilder::new(type_b)
2758            .len(6)
2759            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2760            .add_child_data(c.into_data())
2761            .build()
2762            .unwrap();
2763        let b = StructArray::from(b_data);
2764        // a intentionally has no null buffer, to test that this is handled correctly
2765        let a_data = ArrayDataBuilder::new(type_a)
2766            .len(6)
2767            .add_child_data(b.into_data())
2768            .build()
2769            .unwrap();
2770        let a = StructArray::from(a_data);
2771
2772        assert_eq!(a.null_count(), 0);
2773        assert_eq!(a.column(0).null_count(), 2);
2774
2775        // build a racord batch
2776        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2777
2778        roundtrip(batch, Some(SMALL_SIZE / 2));
2779    }
2780
2781    #[test]
2782    fn arrow_writer_2_level_struct_mixed_null_2() {
2783        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2784        let field_c = Field::new("c", DataType::Int32, false);
2785        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2786        let field_e = Field::new(
2787            "e",
2788            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2789            false,
2790        );
2791
2792        let field_b = Field::new(
2793            "b",
2794            DataType::Struct(vec![field_c, field_d, field_e].into()),
2795            false,
2796        );
2797        let type_a = DataType::Struct(vec![field_b.clone()].into());
2798        let field_a = Field::new("a", type_a, true);
2799        let schema = Schema::new(vec![field_a.clone()]);
2800
2801        // create data
2802        let c = Int32Array::from_iter_values(0..6);
2803        let d = FixedSizeBinaryArray::try_from_iter(
2804            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2805        )
2806        .expect("four byte values");
2807        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2808        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2809            .len(6)
2810            .add_child_data(c.into_data())
2811            .add_child_data(d.into_data())
2812            .add_child_data(e.into_data())
2813            .build()
2814            .unwrap();
2815        let b = StructArray::from(b_data);
2816        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2817            .len(6)
2818            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2819            .add_child_data(b.into_data())
2820            .build()
2821            .unwrap();
2822        let a = StructArray::from(a_data);
2823
2824        assert_eq!(a.null_count(), 3);
2825        assert_eq!(a.column(0).null_count(), 0);
2826
2827        // build a record batch
2828        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2829
2830        roundtrip(batch, Some(SMALL_SIZE / 2));
2831    }
2832
2833    #[test]
2834    fn test_fixed_size_binary_in_dict() {
2835        fn test_fixed_size_binary_in_dict_inner<K>()
2836        where
2837            K: ArrowDictionaryKeyType,
2838            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2839            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2840        {
2841            let field = Field::new(
2842                "a",
2843                DataType::Dictionary(
2844                    Box::new(K::DATA_TYPE),
2845                    Box::new(DataType::FixedSizeBinary(4)),
2846                ),
2847                false,
2848            );
2849            let schema = Schema::new(vec![field]);
2850
2851            let keys: Vec<K::Native> = vec![
2852                K::Native::try_from(0u8).unwrap(),
2853                K::Native::try_from(0u8).unwrap(),
2854                K::Native::try_from(1u8).unwrap(),
2855            ];
2856            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2857            let values = FixedSizeBinaryArray::try_from_iter(
2858                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2859            )
2860            .unwrap();
2861
2862            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2863            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2864            roundtrip(batch, None);
2865        }
2866
2867        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2868        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2869        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2870        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2871        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2872        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2873        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2874        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2875    }
2876
2877    #[test]
2878    fn test_empty_dict() {
2879        let struct_fields = Fields::from(vec![Field::new(
2880            "dict",
2881            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2882            false,
2883        )]);
2884
2885        let schema = Schema::new(vec![Field::new_struct(
2886            "struct",
2887            struct_fields.clone(),
2888            true,
2889        )]);
2890        let dictionary = Arc::new(DictionaryArray::new(
2891            Int32Array::new_null(5),
2892            Arc::new(StringArray::new_null(0)),
2893        ));
2894
2895        let s = StructArray::new(
2896            struct_fields,
2897            vec![dictionary],
2898            Some(NullBuffer::new_null(5)),
2899        );
2900
2901        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2902        roundtrip(batch, None);
2903    }
2904    #[test]
2905    fn arrow_writer_page_size() {
2906        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2907
2908        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2909
2910        // Generate an array of 10 unique 10 character string
2911        for i in 0..10 {
2912            let value = i
2913                .to_string()
2914                .repeat(10)
2915                .chars()
2916                .take(10)
2917                .collect::<String>();
2918
2919            builder.append_value(value);
2920        }
2921
2922        let array = Arc::new(builder.finish());
2923
2924        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2925
2926        let file = tempfile::tempfile().unwrap();
2927
2928        // Set everything very low so we fallback to PLAIN encoding after the first row
2929        let props = WriterProperties::builder()
2930            .set_data_page_size_limit(1)
2931            .set_dictionary_page_size_limit(1)
2932            .set_write_batch_size(1)
2933            .build();
2934
2935        let mut writer =
2936            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2937                .expect("Unable to write file");
2938        writer.write(&batch).unwrap();
2939        writer.close().unwrap();
2940
2941        let options = ReadOptionsBuilder::new().with_page_index().build();
2942        let reader =
2943            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2944
2945        let column = reader.metadata().row_group(0).columns();
2946
2947        assert_eq!(column.len(), 1);
2948
2949        // We should write one row before falling back to PLAIN encoding so there should still be a
2950        // dictionary page.
2951        assert!(
2952            column[0].dictionary_page_offset().is_some(),
2953            "Expected a dictionary page"
2954        );
2955
2956        assert!(reader.metadata().offset_index().is_some());
2957        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2958
2959        let page_locations = offset_indexes[0].page_locations.clone();
2960
2961        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2962        // so we expect one dictionary encoded page and then a page per row thereafter.
2963        assert_eq!(
2964            page_locations.len(),
2965            10,
2966            "Expected 10 pages but got {page_locations:#?}"
2967        );
2968    }
2969
2970    #[test]
2971    fn arrow_writer_float_nans() {
2972        let f16_field = Field::new("a", DataType::Float16, false);
2973        let f32_field = Field::new("b", DataType::Float32, false);
2974        let f64_field = Field::new("c", DataType::Float64, false);
2975        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2976
2977        let f16_values = (0..MEDIUM_SIZE)
2978            .map(|i| {
2979                Some(if i % 2 == 0 {
2980                    f16::NAN
2981                } else {
2982                    f16::from_f32(i as f32)
2983                })
2984            })
2985            .collect::<Float16Array>();
2986
2987        let f32_values = (0..MEDIUM_SIZE)
2988            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2989            .collect::<Float32Array>();
2990
2991        let f64_values = (0..MEDIUM_SIZE)
2992            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2993            .collect::<Float64Array>();
2994
2995        let batch = RecordBatch::try_new(
2996            Arc::new(schema),
2997            vec![
2998                Arc::new(f16_values),
2999                Arc::new(f32_values),
3000                Arc::new(f64_values),
3001            ],
3002        )
3003        .unwrap();
3004
3005        roundtrip(batch, None);
3006    }
3007
3008    const SMALL_SIZE: usize = 7;
3009    const MEDIUM_SIZE: usize = 63;
3010
3011    // Write the batch to parquet and read it back out, ensuring
3012    // that what comes out is the same as what was written in
3013    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3014        let mut files = vec![];
3015        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3016            let mut props = WriterProperties::builder().set_writer_version(version);
3017
3018            if let Some(size) = max_row_group_size {
3019                props = props.set_max_row_group_row_count(Some(size))
3020            }
3021
3022            let props = props.build();
3023            files.push(roundtrip_opts(&expected_batch, props))
3024        }
3025        files
3026    }
3027
3028    // Round trip the specified record batch with the specified writer properties,
3029    // to an in-memory file, and validate the arrays using the specified function.
3030    // Returns the in-memory file.
3031    fn roundtrip_opts_with_array_validation<F>(
3032        expected_batch: &RecordBatch,
3033        props: WriterProperties,
3034        validate: F,
3035    ) -> Bytes
3036    where
3037        F: Fn(&ArrayData, &ArrayData),
3038    {
3039        let mut file = vec![];
3040
3041        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3042            .expect("Unable to write file");
3043        writer.write(expected_batch).unwrap();
3044        writer.close().unwrap();
3045
3046        let file = Bytes::from(file);
3047        let mut record_batch_reader =
3048            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3049
3050        let actual_batch = record_batch_reader
3051            .next()
3052            .expect("No batch found")
3053            .expect("Unable to get batch");
3054
3055        assert_eq!(expected_batch.schema(), actual_batch.schema());
3056        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3057        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3058        for i in 0..expected_batch.num_columns() {
3059            let expected_data = expected_batch.column(i).to_data();
3060            let actual_data = actual_batch.column(i).to_data();
3061            validate(&expected_data, &actual_data);
3062        }
3063
3064        file
3065    }
3066
3067    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3068        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3069            a.validate_full().expect("valid expected data");
3070            b.validate_full().expect("valid actual data");
3071            assert_eq!(a, b)
3072        })
3073    }
3074
3075    struct RoundTripOptions {
3076        values: ArrayRef,
3077        schema: SchemaRef,
3078        bloom_filter: bool,
3079        bloom_filter_ndv: Option<u64>,
3080        bloom_filter_position: BloomFilterPosition,
3081    }
3082
3083    impl RoundTripOptions {
3084        fn new(values: ArrayRef, nullable: bool) -> Self {
3085            let data_type = values.data_type().clone();
3086            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3087            Self {
3088                values,
3089                schema: Arc::new(schema),
3090                bloom_filter: false,
3091                bloom_filter_ndv: None,
3092                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3093            }
3094        }
3095    }
3096
3097    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3098        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3099    }
3100
3101    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3102        let mut options = RoundTripOptions::new(values, false);
3103        options.schema = schema;
3104        one_column_roundtrip_with_options(options)
3105    }
3106
3107    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3108        let RoundTripOptions {
3109            values,
3110            schema,
3111            bloom_filter,
3112            bloom_filter_ndv,
3113            bloom_filter_position,
3114        } = options;
3115
3116        let encodings = match values.data_type() {
3117            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3118                vec![
3119                    Encoding::PLAIN,
3120                    Encoding::DELTA_BYTE_ARRAY,
3121                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
3122                ]
3123            }
3124            DataType::Int64
3125            | DataType::Int32
3126            | DataType::Int16
3127            | DataType::Int8
3128            | DataType::UInt64
3129            | DataType::UInt32
3130            | DataType::UInt16
3131            | DataType::UInt8 => vec![
3132                Encoding::PLAIN,
3133                Encoding::DELTA_BINARY_PACKED,
3134                Encoding::BYTE_STREAM_SPLIT,
3135            ],
3136            DataType::Float32 | DataType::Float64 => {
3137                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3138            }
3139            _ => vec![Encoding::PLAIN],
3140        };
3141
3142        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3143
3144        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3145
3146        let mut files = vec![];
3147        for dictionary_size in [0, 1, 1024] {
3148            for encoding in &encodings {
3149                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3150                    for row_group_size in row_group_sizes {
3151                        let mut builder = WriterProperties::builder()
3152                            .set_writer_version(version)
3153                            .set_max_row_group_row_count(Some(row_group_size))
3154                            .set_dictionary_enabled(dictionary_size != 0)
3155                            .set_dictionary_page_size_limit(dictionary_size.max(1))
3156                            .set_encoding(*encoding)
3157                            .set_bloom_filter_enabled(bloom_filter)
3158                            .set_bloom_filter_position(bloom_filter_position);
3159                        if let Some(ndv) = bloom_filter_ndv {
3160                            builder = builder.set_bloom_filter_max_ndv(ndv);
3161                        }
3162                        let props = builder.build();
3163
3164                        files.push(roundtrip_opts(&expected_batch, props))
3165                    }
3166                }
3167            }
3168        }
3169        files
3170    }
3171
3172    fn values_required<A, I>(iter: I) -> Vec<Bytes>
3173    where
3174        A: From<Vec<I::Item>> + Array + 'static,
3175        I: IntoIterator,
3176    {
3177        let raw_values: Vec<_> = iter.into_iter().collect();
3178        let values = Arc::new(A::from(raw_values));
3179        one_column_roundtrip(values, false)
3180    }
3181
3182    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3183    where
3184        A: From<Vec<Option<I::Item>>> + Array + 'static,
3185        I: IntoIterator,
3186    {
3187        let optional_raw_values: Vec<_> = iter
3188            .into_iter()
3189            .enumerate()
3190            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3191            .collect();
3192        let optional_values = Arc::new(A::from(optional_raw_values));
3193        one_column_roundtrip(optional_values, true)
3194    }
3195
3196    fn required_and_optional<A, I>(iter: I)
3197    where
3198        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3199        I: IntoIterator + Clone,
3200    {
3201        values_required::<A, I>(iter.clone());
3202        values_optional::<A, I>(iter);
3203    }
3204
3205    fn check_bloom_filter<T: AsBytes>(
3206        files: Vec<Bytes>,
3207        file_column: String,
3208        positive_values: Vec<T>,
3209        negative_values: Vec<T>,
3210    ) {
3211        files.into_iter().take(1).for_each(|file| {
3212            let file_reader = SerializedFileReader::new_with_options(
3213                file,
3214                ReadOptionsBuilder::new()
3215                    .with_reader_properties(
3216                        ReaderProperties::builder()
3217                            .set_read_bloom_filter(true)
3218                            .build(),
3219                    )
3220                    .build(),
3221            )
3222            .expect("Unable to open file as Parquet");
3223            let metadata = file_reader.metadata();
3224
3225            // Gets bloom filters from all row groups.
3226            let mut bloom_filters: Vec<_> = vec![];
3227            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3228                if let Some((column_index, _)) = row_group
3229                    .columns()
3230                    .iter()
3231                    .enumerate()
3232                    .find(|(_, column)| column.column_path().string() == file_column)
3233                {
3234                    let row_group_reader = file_reader
3235                        .get_row_group(ri)
3236                        .expect("Unable to read row group");
3237                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3238                        bloom_filters.push(sbbf.clone());
3239                    } else {
3240                        panic!("No bloom filter for column named {file_column} found");
3241                    }
3242                } else {
3243                    panic!("No column named {file_column} found");
3244                }
3245            }
3246
3247            positive_values.iter().for_each(|value| {
3248                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3249                assert!(
3250                    found.is_some(),
3251                    "{}",
3252                    format!("Value {:?} should be in bloom filter", value.as_bytes())
3253                );
3254            });
3255
3256            negative_values.iter().for_each(|value| {
3257                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3258                assert!(
3259                    found.is_none(),
3260                    "{}",
3261                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
3262                );
3263            });
3264        });
3265    }
3266
3267    #[test]
3268    fn all_null_primitive_single_column() {
3269        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3270        one_column_roundtrip(values, true);
3271    }
3272    #[test]
3273    fn null_single_column() {
3274        let values = Arc::new(NullArray::new(SMALL_SIZE));
3275        one_column_roundtrip(values, true);
3276        // null arrays are always nullable, a test with non-nullable nulls fails
3277    }
3278
3279    #[test]
3280    fn bool_single_column() {
3281        required_and_optional::<BooleanArray, _>(
3282            [true, false].iter().cycle().copied().take(SMALL_SIZE),
3283        );
3284    }
3285
3286    #[test]
3287    fn bool_large_single_column() {
3288        let values = Arc::new(
3289            [None, Some(true), Some(false)]
3290                .iter()
3291                .cycle()
3292                .copied()
3293                .take(200_000)
3294                .collect::<BooleanArray>(),
3295        );
3296        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3297        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3298        let file = tempfile::tempfile().unwrap();
3299
3300        let mut writer =
3301            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3302                .expect("Unable to write file");
3303        writer.write(&expected_batch).unwrap();
3304        writer.close().unwrap();
3305    }
3306
3307    #[test]
3308    fn check_page_offset_index_with_nan() {
3309        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3310        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3311        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3312
3313        let mut out = Vec::with_capacity(1024);
3314        let mut writer =
3315            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3316        writer.write(&batch).unwrap();
3317        let file_meta_data = writer.close().unwrap();
3318        for row_group in file_meta_data.row_groups() {
3319            for column in row_group.columns() {
3320                assert!(column.offset_index_offset().is_some());
3321                assert!(column.offset_index_length().is_some());
3322                assert!(column.column_index_offset().is_none());
3323                assert!(column.column_index_length().is_none());
3324            }
3325        }
3326    }
3327
3328    #[test]
3329    fn i8_single_column() {
3330        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3331    }
3332
3333    #[test]
3334    fn i16_single_column() {
3335        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3336    }
3337
3338    #[test]
3339    fn i32_single_column() {
3340        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3341    }
3342
3343    #[test]
3344    fn i64_single_column() {
3345        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3346    }
3347
3348    #[test]
3349    fn u8_single_column() {
3350        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3351    }
3352
3353    #[test]
3354    fn u16_single_column() {
3355        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3356    }
3357
3358    #[test]
3359    fn u32_single_column() {
3360        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3361    }
3362
3363    #[test]
3364    fn u64_single_column() {
3365        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3366    }
3367
3368    #[test]
3369    fn f32_single_column() {
3370        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3371    }
3372
3373    #[test]
3374    fn f64_single_column() {
3375        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3376    }
3377
3378    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
3379    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
3380    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
3381
3382    #[test]
3383    fn timestamp_second_single_column() {
3384        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3385        let values = Arc::new(TimestampSecondArray::from(raw_values));
3386
3387        one_column_roundtrip(values, false);
3388    }
3389
3390    #[test]
3391    fn timestamp_millisecond_single_column() {
3392        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3393        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3394
3395        one_column_roundtrip(values, false);
3396    }
3397
3398    #[test]
3399    fn timestamp_microsecond_single_column() {
3400        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3401        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3402
3403        one_column_roundtrip(values, false);
3404    }
3405
3406    #[test]
3407    fn timestamp_nanosecond_single_column() {
3408        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3409        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3410
3411        one_column_roundtrip(values, false);
3412    }
3413
3414    #[test]
3415    fn date32_single_column() {
3416        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3417    }
3418
3419    #[test]
3420    fn date64_single_column() {
3421        // Date64 must be a multiple of 86400000, see ARROW-10925
3422        required_and_optional::<Date64Array, _>(
3423            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3424        );
3425    }
3426
3427    #[test]
3428    fn time32_second_single_column() {
3429        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3430    }
3431
3432    #[test]
3433    fn time32_millisecond_single_column() {
3434        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3435    }
3436
3437    #[test]
3438    fn time64_microsecond_single_column() {
3439        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3440    }
3441
3442    #[test]
3443    fn time64_nanosecond_single_column() {
3444        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3445    }
3446
3447    #[test]
3448    fn duration_second_single_column() {
3449        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3450    }
3451
3452    #[test]
3453    fn duration_millisecond_single_column() {
3454        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3455    }
3456
3457    #[test]
3458    fn duration_microsecond_single_column() {
3459        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3460    }
3461
3462    #[test]
3463    fn duration_nanosecond_single_column() {
3464        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3465    }
3466
3467    #[test]
3468    fn interval_year_month_single_column() {
3469        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3470    }
3471
3472    #[test]
3473    fn interval_day_time_single_column() {
3474        required_and_optional::<IntervalDayTimeArray, _>(vec![
3475            IntervalDayTime::new(0, 1),
3476            IntervalDayTime::new(0, 3),
3477            IntervalDayTime::new(3, -2),
3478            IntervalDayTime::new(-200, 4),
3479        ]);
3480    }
3481
3482    #[test]
3483    #[should_panic(
3484        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3485    )]
3486    fn interval_month_day_nano_single_column() {
3487        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3488            IntervalMonthDayNano::new(0, 1, 5),
3489            IntervalMonthDayNano::new(0, 3, 2),
3490            IntervalMonthDayNano::new(3, -2, -5),
3491            IntervalMonthDayNano::new(-200, 4, -1),
3492        ]);
3493    }
3494
3495    #[test]
3496    fn binary_single_column() {
3497        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3498        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3499        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3500
3501        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3502        values_required::<BinaryArray, _>(many_vecs_iter);
3503    }
3504
3505    #[test]
3506    fn binary_view_single_column() {
3507        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3508        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3509        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3510
3511        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3512        values_required::<BinaryViewArray, _>(many_vecs_iter);
3513    }
3514
3515    #[test]
3516    fn i32_column_bloom_filter_at_end() {
3517        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3518        let mut options = RoundTripOptions::new(array, false);
3519        options.bloom_filter = true;
3520        options.bloom_filter_position = BloomFilterPosition::End;
3521
3522        let files = one_column_roundtrip_with_options(options);
3523        check_bloom_filter(
3524            files,
3525            "col".to_string(),
3526            (0..SMALL_SIZE as i32).collect(),
3527            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3528        );
3529    }
3530
3531    #[test]
3532    fn i32_column_bloom_filter() {
3533        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3534        let mut options = RoundTripOptions::new(array, false);
3535        options.bloom_filter = true;
3536
3537        let files = one_column_roundtrip_with_options(options);
3538        check_bloom_filter(
3539            files,
3540            "col".to_string(),
3541            (0..SMALL_SIZE as i32).collect(),
3542            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3543        );
3544    }
3545
3546    /// Test that bloom filter folding produces correct results even when
3547    /// the configured NDV differs significantly from actual NDV.
3548    /// A large NDV means a larger initial filter that gets folded down;
3549    /// a small NDV means a smaller initial filter.
3550    #[test]
3551    fn i32_column_bloom_filter_fixed_ndv() {
3552        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3553
3554        // NDV much larger than actual distinct values — tests folding a large filter down
3555        let mut options = RoundTripOptions::new(array.clone(), false);
3556        options.bloom_filter = true;
3557        options.bloom_filter_ndv = Some(1_000_000);
3558
3559        let files = one_column_roundtrip_with_options(options);
3560        check_bloom_filter(
3561            files,
3562            "col".to_string(),
3563            (0..SMALL_SIZE as i32).collect(),
3564            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3565        );
3566
3567        // NDV smaller than actual distinct values — tests the underestimate path
3568        let mut options = RoundTripOptions::new(array, false);
3569        options.bloom_filter = true;
3570        options.bloom_filter_ndv = Some(3);
3571
3572        let files = one_column_roundtrip_with_options(options);
3573        check_bloom_filter(
3574            files,
3575            "col".to_string(),
3576            (0..SMALL_SIZE as i32).collect(),
3577            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3578        );
3579    }
3580
3581    #[test]
3582    fn binary_column_bloom_filter() {
3583        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3584        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3585        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3586
3587        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3588        let mut options = RoundTripOptions::new(array, false);
3589        options.bloom_filter = true;
3590
3591        let files = one_column_roundtrip_with_options(options);
3592        check_bloom_filter(
3593            files,
3594            "col".to_string(),
3595            many_vecs,
3596            vec![vec![(SMALL_SIZE + 1) as u8]],
3597        );
3598    }
3599
3600    #[test]
3601    fn empty_string_null_column_bloom_filter() {
3602        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3603        let raw_strs = raw_values.iter().map(|s| s.as_str());
3604
3605        let array = Arc::new(StringArray::from_iter_values(raw_strs));
3606        let mut options = RoundTripOptions::new(array, false);
3607        options.bloom_filter = true;
3608
3609        let files = one_column_roundtrip_with_options(options);
3610
3611        let optional_raw_values: Vec<_> = raw_values
3612            .iter()
3613            .enumerate()
3614            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3615            .collect();
3616        // For null slots, empty string should not be in bloom filter.
3617        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3618    }
3619
3620    #[test]
3621    fn large_binary_single_column() {
3622        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3623        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3624        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3625
3626        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3627        values_required::<LargeBinaryArray, _>(many_vecs_iter);
3628    }
3629
3630    #[test]
3631    fn fixed_size_binary_single_column() {
3632        let mut builder = FixedSizeBinaryBuilder::new(4);
3633        builder.append_value(b"0123").unwrap();
3634        builder.append_null();
3635        builder.append_value(b"8910").unwrap();
3636        builder.append_value(b"1112").unwrap();
3637        let array = Arc::new(builder.finish());
3638
3639        one_column_roundtrip(array, true);
3640    }
3641
3642    #[test]
3643    fn string_single_column() {
3644        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3645        let raw_strs = raw_values.iter().map(|s| s.as_str());
3646
3647        required_and_optional::<StringArray, _>(raw_strs);
3648    }
3649
3650    #[test]
3651    fn large_string_single_column() {
3652        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3653        let raw_strs = raw_values.iter().map(|s| s.as_str());
3654
3655        required_and_optional::<LargeStringArray, _>(raw_strs);
3656    }
3657
3658    #[test]
3659    fn string_view_single_column() {
3660        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3661        let raw_strs = raw_values.iter().map(|s| s.as_str());
3662
3663        required_and_optional::<StringViewArray, _>(raw_strs);
3664    }
3665
3666    #[test]
3667    fn null_list_single_column() {
3668        let null_field = Field::new_list_field(DataType::Null, true);
3669        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3670
3671        let schema = Schema::new(vec![list_field]);
3672
3673        // Build [[], null, [null, null]]
3674        let a_values = NullArray::new(2);
3675        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3676        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3677            DataType::Null,
3678            true,
3679        ))))
3680        .len(3)
3681        .add_buffer(a_value_offsets)
3682        .null_bit_buffer(Some(Buffer::from([0b00000101])))
3683        .add_child_data(a_values.into_data())
3684        .build()
3685        .unwrap();
3686
3687        let a = ListArray::from(a_list_data);
3688
3689        assert!(a.is_valid(0));
3690        assert!(!a.is_valid(1));
3691        assert!(a.is_valid(2));
3692
3693        assert_eq!(a.value(0).len(), 0);
3694        assert_eq!(a.value(2).len(), 2);
3695        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3696
3697        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3698        roundtrip(batch, None);
3699    }
3700
3701    #[test]
3702    fn list_single_column() {
3703        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3704        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3705        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3706            DataType::Int32,
3707            false,
3708        ))))
3709        .len(5)
3710        .add_buffer(a_value_offsets)
3711        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3712        .add_child_data(a_values.into_data())
3713        .build()
3714        .unwrap();
3715
3716        assert_eq!(a_list_data.null_count(), 1);
3717
3718        let a = ListArray::from(a_list_data);
3719        let values = Arc::new(a);
3720
3721        one_column_roundtrip(values, true);
3722    }
3723
3724    #[test]
3725    fn large_list_single_column() {
3726        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3727        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3728        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3729            "large_item",
3730            DataType::Int32,
3731            true,
3732        ))))
3733        .len(5)
3734        .add_buffer(a_value_offsets)
3735        .add_child_data(a_values.into_data())
3736        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3737        .build()
3738        .unwrap();
3739
3740        // I think this setup is incorrect because this should pass
3741        assert_eq!(a_list_data.null_count(), 1);
3742
3743        let a = LargeListArray::from(a_list_data);
3744        let values = Arc::new(a);
3745
3746        one_column_roundtrip(values, true);
3747    }
3748
3749    #[test]
3750    fn list_nested_nulls() {
3751        use arrow::datatypes::Int32Type;
3752        let data = vec![
3753            Some(vec![Some(1)]),
3754            Some(vec![Some(2), Some(3)]),
3755            None,
3756            Some(vec![Some(4), Some(5), None]),
3757            Some(vec![None]),
3758            Some(vec![Some(6), Some(7)]),
3759        ];
3760
3761        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3762        one_column_roundtrip(Arc::new(list), true);
3763
3764        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3765        one_column_roundtrip(Arc::new(list), true);
3766    }
3767
3768    #[test]
3769    fn struct_single_column() {
3770        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3771        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3772        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3773
3774        let values = Arc::new(s);
3775        one_column_roundtrip(values, false);
3776    }
3777
3778    #[test]
3779    fn list_and_map_coerced_names() {
3780        // Create map and list with non-Parquet naming
3781        let list_field =
3782            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3783        let map_field = Field::new_map(
3784            "my_map",
3785            "entries",
3786            Field::new("keys", DataType::Int32, false),
3787            Field::new("values", DataType::Int32, true),
3788            false,
3789            true,
3790        );
3791
3792        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3793        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3794
3795        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3796
3797        // Write data to Parquet but coerce names to match spec
3798        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3799        let file = tempfile::tempfile().unwrap();
3800        let mut writer =
3801            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3802
3803        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3804        writer.write(&batch).unwrap();
3805        let file_metadata = writer.close().unwrap();
3806
3807        let schema = file_metadata.file_metadata().schema();
3808        // Coerced name of "item" should be "element"
3809        let list_field = &schema.get_fields()[0].get_fields()[0];
3810        assert_eq!(list_field.get_fields()[0].name(), "element");
3811
3812        let map_field = &schema.get_fields()[1].get_fields()[0];
3813        // Coerced name of "entries" should be "key_value"
3814        assert_eq!(map_field.name(), "key_value");
3815        // Coerced name of "keys" should be "key"
3816        assert_eq!(map_field.get_fields()[0].name(), "key");
3817        // Coerced name of "values" should be "value"
3818        assert_eq!(map_field.get_fields()[1].name(), "value");
3819
3820        // Double check schema after reading from the file
3821        let reader = SerializedFileReader::new(file).unwrap();
3822        let file_schema = reader.metadata().file_metadata().schema();
3823        let fields = file_schema.get_fields();
3824        let list_field = &fields[0].get_fields()[0];
3825        assert_eq!(list_field.get_fields()[0].name(), "element");
3826        let map_field = &fields[1].get_fields()[0];
3827        assert_eq!(map_field.name(), "key_value");
3828        assert_eq!(map_field.get_fields()[0].name(), "key");
3829        assert_eq!(map_field.get_fields()[1].name(), "value");
3830    }
3831
3832    #[test]
3833    fn fallback_flush_data_page() {
3834        //tests if the Fallback::flush_data_page clears all buffers correctly
3835        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3836        let values = Arc::new(StringArray::from(raw_values));
3837        let encodings = vec![
3838            Encoding::DELTA_BYTE_ARRAY,
3839            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3840        ];
3841        let data_type = values.data_type().clone();
3842        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3843        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3844
3845        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3846        let data_page_size_limit: usize = 32;
3847        let write_batch_size: usize = 16;
3848
3849        for encoding in &encodings {
3850            for row_group_size in row_group_sizes {
3851                let props = WriterProperties::builder()
3852                    .set_writer_version(WriterVersion::PARQUET_2_0)
3853                    .set_max_row_group_row_count(Some(row_group_size))
3854                    .set_dictionary_enabled(false)
3855                    .set_encoding(*encoding)
3856                    .set_data_page_size_limit(data_page_size_limit)
3857                    .set_write_batch_size(write_batch_size)
3858                    .build();
3859
3860                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3861                    let string_array_a = StringArray::from(a.clone());
3862                    let string_array_b = StringArray::from(b.clone());
3863                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3864                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3865                    assert_eq!(
3866                        vec_a, vec_b,
3867                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3868                    );
3869                });
3870            }
3871        }
3872    }
3873
3874    #[test]
3875    fn arrow_writer_string_dictionary() {
3876        // define schema
3877        #[allow(deprecated)]
3878        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3879            "dictionary",
3880            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3881            true,
3882            42,
3883            true,
3884        )]));
3885
3886        // create some data
3887        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3888            .iter()
3889            .copied()
3890            .collect();
3891
3892        // build a record batch
3893        one_column_roundtrip_with_schema(Arc::new(d), schema);
3894    }
3895
3896    #[test]
3897    fn arrow_writer_test_type_compatibility() {
3898        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3899        where
3900            T1: Array + 'static,
3901            T2: Array + 'static,
3902        {
3903            let schema1 = Arc::new(Schema::new(vec![Field::new(
3904                "a",
3905                array1.data_type().clone(),
3906                false,
3907            )]));
3908
3909            let file = tempfile().unwrap();
3910            let mut writer =
3911                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3912
3913            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3914            writer.write(&rb1).unwrap();
3915
3916            let schema2 = Arc::new(Schema::new(vec![Field::new(
3917                "a",
3918                array2.data_type().clone(),
3919                false,
3920            )]));
3921            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3922            writer.write(&rb2).unwrap();
3923
3924            writer.close().unwrap();
3925
3926            let mut record_batch_reader =
3927                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3928            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3929
3930            let expected_batch =
3931                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3932            assert_eq!(actual_batch, expected_batch);
3933        }
3934
3935        // check compatibility between native and dictionaries
3936
3937        ensure_compatible_write(
3938            DictionaryArray::new(
3939                UInt8Array::from_iter_values(vec![0]),
3940                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3941            ),
3942            StringArray::from_iter_values(vec!["barquet"]),
3943            DictionaryArray::new(
3944                UInt8Array::from_iter_values(vec![0, 1]),
3945                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3946            ),
3947        );
3948
3949        ensure_compatible_write(
3950            StringArray::from_iter_values(vec!["parquet"]),
3951            DictionaryArray::new(
3952                UInt8Array::from_iter_values(vec![0]),
3953                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3954            ),
3955            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3956        );
3957
3958        // check compatibility between dictionaries with different key types
3959
3960        ensure_compatible_write(
3961            DictionaryArray::new(
3962                UInt8Array::from_iter_values(vec![0]),
3963                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3964            ),
3965            DictionaryArray::new(
3966                UInt16Array::from_iter_values(vec![0]),
3967                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3968            ),
3969            DictionaryArray::new(
3970                UInt8Array::from_iter_values(vec![0, 1]),
3971                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3972            ),
3973        );
3974
3975        // check compatibility between dictionaries with different value types
3976        ensure_compatible_write(
3977            DictionaryArray::new(
3978                UInt8Array::from_iter_values(vec![0]),
3979                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3980            ),
3981            DictionaryArray::new(
3982                UInt8Array::from_iter_values(vec![0]),
3983                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3984            ),
3985            DictionaryArray::new(
3986                UInt8Array::from_iter_values(vec![0, 1]),
3987                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3988            ),
3989        );
3990
3991        // check compatibility between a dictionary and a native array with a different type
3992        ensure_compatible_write(
3993            DictionaryArray::new(
3994                UInt8Array::from_iter_values(vec![0]),
3995                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3996            ),
3997            LargeStringArray::from_iter_values(vec!["barquet"]),
3998            DictionaryArray::new(
3999                UInt8Array::from_iter_values(vec![0, 1]),
4000                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4001            ),
4002        );
4003
4004        // check compatibility for string types
4005
4006        ensure_compatible_write(
4007            StringArray::from_iter_values(vec!["parquet"]),
4008            LargeStringArray::from_iter_values(vec!["barquet"]),
4009            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4010        );
4011
4012        ensure_compatible_write(
4013            LargeStringArray::from_iter_values(vec!["parquet"]),
4014            StringArray::from_iter_values(vec!["barquet"]),
4015            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4016        );
4017
4018        ensure_compatible_write(
4019            StringArray::from_iter_values(vec!["parquet"]),
4020            StringViewArray::from_iter_values(vec!["barquet"]),
4021            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4022        );
4023
4024        ensure_compatible_write(
4025            StringViewArray::from_iter_values(vec!["parquet"]),
4026            StringArray::from_iter_values(vec!["barquet"]),
4027            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4028        );
4029
4030        ensure_compatible_write(
4031            LargeStringArray::from_iter_values(vec!["parquet"]),
4032            StringViewArray::from_iter_values(vec!["barquet"]),
4033            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4034        );
4035
4036        ensure_compatible_write(
4037            StringViewArray::from_iter_values(vec!["parquet"]),
4038            LargeStringArray::from_iter_values(vec!["barquet"]),
4039            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4040        );
4041
4042        // check compatibility for binary types
4043
4044        ensure_compatible_write(
4045            BinaryArray::from_iter_values(vec![b"parquet"]),
4046            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4047            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4048        );
4049
4050        ensure_compatible_write(
4051            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4052            BinaryArray::from_iter_values(vec![b"barquet"]),
4053            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4054        );
4055
4056        ensure_compatible_write(
4057            BinaryArray::from_iter_values(vec![b"parquet"]),
4058            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4059            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4060        );
4061
4062        ensure_compatible_write(
4063            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4064            BinaryArray::from_iter_values(vec![b"barquet"]),
4065            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4066        );
4067
4068        ensure_compatible_write(
4069            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4070            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4071            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4072        );
4073
4074        ensure_compatible_write(
4075            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4076            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4077            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4078        );
4079
4080        // check compatibility for list types
4081
4082        let list_field_metadata = HashMap::from_iter(vec![(
4083            PARQUET_FIELD_ID_META_KEY.to_string(),
4084            "1".to_string(),
4085        )]);
4086        let list_field = Field::new_list_field(DataType::Int32, false);
4087
4088        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4089        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4090
4091        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4092        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4093
4094        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4095        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4096
4097        ensure_compatible_write(
4098            // when the initial schema has the metadata ...
4099            ListArray::try_new(
4100                Arc::new(
4101                    list_field
4102                        .clone()
4103                        .with_metadata(list_field_metadata.clone()),
4104                ),
4105                offsets1,
4106                values1,
4107                None,
4108            )
4109            .unwrap(),
4110            // ... and some intermediate schema doesn't have the metadata
4111            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4112            // ... the write will still go through, and the resulting schema will inherit the initial metadata
4113            ListArray::try_new(
4114                Arc::new(
4115                    list_field
4116                        .clone()
4117                        .with_metadata(list_field_metadata.clone()),
4118                ),
4119                offsets_expected,
4120                values_expected,
4121                None,
4122            )
4123            .unwrap(),
4124        );
4125    }
4126
4127    #[test]
4128    fn arrow_writer_primitive_dictionary() {
4129        // define schema
4130        #[allow(deprecated)]
4131        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4132            "dictionary",
4133            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4134            true,
4135            42,
4136            true,
4137        )]));
4138
4139        // create some data
4140        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4141        builder.append(12345678).unwrap();
4142        builder.append_null();
4143        builder.append(22345678).unwrap();
4144        builder.append(12345678).unwrap();
4145        let d = builder.finish();
4146
4147        one_column_roundtrip_with_schema(Arc::new(d), schema);
4148    }
4149
4150    #[test]
4151    fn arrow_writer_decimal32_dictionary() {
4152        let integers = vec![12345, 56789, 34567];
4153
4154        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4155
4156        let values = Decimal32Array::from(integers.clone())
4157            .with_precision_and_scale(5, 2)
4158            .unwrap();
4159
4160        let array = DictionaryArray::new(keys, Arc::new(values));
4161        one_column_roundtrip(Arc::new(array.clone()), true);
4162
4163        let values = Decimal32Array::from(integers)
4164            .with_precision_and_scale(9, 2)
4165            .unwrap();
4166
4167        let array = array.with_values(Arc::new(values));
4168        one_column_roundtrip(Arc::new(array), true);
4169    }
4170
4171    #[test]
4172    fn arrow_writer_decimal64_dictionary() {
4173        let integers = vec![12345, 56789, 34567];
4174
4175        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4176
4177        let values = Decimal64Array::from(integers.clone())
4178            .with_precision_and_scale(5, 2)
4179            .unwrap();
4180
4181        let array = DictionaryArray::new(keys, Arc::new(values));
4182        one_column_roundtrip(Arc::new(array.clone()), true);
4183
4184        let values = Decimal64Array::from(integers)
4185            .with_precision_and_scale(12, 2)
4186            .unwrap();
4187
4188        let array = array.with_values(Arc::new(values));
4189        one_column_roundtrip(Arc::new(array), true);
4190    }
4191
4192    #[test]
4193    fn arrow_writer_decimal128_dictionary() {
4194        let integers = vec![12345, 56789, 34567];
4195
4196        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4197
4198        let values = Decimal128Array::from(integers.clone())
4199            .with_precision_and_scale(5, 2)
4200            .unwrap();
4201
4202        let array = DictionaryArray::new(keys, Arc::new(values));
4203        one_column_roundtrip(Arc::new(array.clone()), true);
4204
4205        let values = Decimal128Array::from(integers)
4206            .with_precision_and_scale(12, 2)
4207            .unwrap();
4208
4209        let array = array.with_values(Arc::new(values));
4210        one_column_roundtrip(Arc::new(array), true);
4211    }
4212
4213    #[test]
4214    fn arrow_writer_decimal256_dictionary() {
4215        let integers = vec![
4216            i256::from_i128(12345),
4217            i256::from_i128(56789),
4218            i256::from_i128(34567),
4219        ];
4220
4221        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4222
4223        let values = Decimal256Array::from(integers.clone())
4224            .with_precision_and_scale(5, 2)
4225            .unwrap();
4226
4227        let array = DictionaryArray::new(keys, Arc::new(values));
4228        one_column_roundtrip(Arc::new(array.clone()), true);
4229
4230        let values = Decimal256Array::from(integers)
4231            .with_precision_and_scale(12, 2)
4232            .unwrap();
4233
4234        let array = array.with_values(Arc::new(values));
4235        one_column_roundtrip(Arc::new(array), true);
4236    }
4237
4238    #[test]
4239    fn arrow_writer_string_dictionary_unsigned_index() {
4240        // define schema
4241        #[allow(deprecated)]
4242        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4243            "dictionary",
4244            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4245            true,
4246            42,
4247            true,
4248        )]));
4249
4250        // create some data
4251        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4252            .iter()
4253            .copied()
4254            .collect();
4255
4256        one_column_roundtrip_with_schema(Arc::new(d), schema);
4257    }
4258
4259    #[test]
4260    fn u32_min_max() {
4261        // check values roundtrip through parquet
4262        let src = [
4263            u32::MIN,
4264            u32::MIN + 1,
4265            (i32::MAX as u32) - 1,
4266            i32::MAX as u32,
4267            (i32::MAX as u32) + 1,
4268            u32::MAX - 1,
4269            u32::MAX,
4270        ];
4271        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4272        let files = one_column_roundtrip(values, false);
4273
4274        for file in files {
4275            // check statistics are valid
4276            let reader = SerializedFileReader::new(file).unwrap();
4277            let metadata = reader.metadata();
4278
4279            let mut row_offset = 0;
4280            for row_group in metadata.row_groups() {
4281                assert_eq!(row_group.num_columns(), 1);
4282                let column = row_group.column(0);
4283
4284                let num_values = column.num_values() as usize;
4285                let src_slice = &src[row_offset..row_offset + num_values];
4286                row_offset += column.num_values() as usize;
4287
4288                let stats = column.statistics().unwrap();
4289                if let Statistics::Int32(stats) = stats {
4290                    assert_eq!(
4291                        *stats.min_opt().unwrap() as u32,
4292                        *src_slice.iter().min().unwrap()
4293                    );
4294                    assert_eq!(
4295                        *stats.max_opt().unwrap() as u32,
4296                        *src_slice.iter().max().unwrap()
4297                    );
4298                } else {
4299                    panic!("Statistics::Int32 missing")
4300                }
4301            }
4302        }
4303    }
4304
4305    #[test]
4306    fn u64_min_max() {
4307        // check values roundtrip through parquet
4308        let src = [
4309            u64::MIN,
4310            u64::MIN + 1,
4311            (i64::MAX as u64) - 1,
4312            i64::MAX as u64,
4313            (i64::MAX as u64) + 1,
4314            u64::MAX - 1,
4315            u64::MAX,
4316        ];
4317        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4318        let files = one_column_roundtrip(values, false);
4319
4320        for file in files {
4321            // check statistics are valid
4322            let reader = SerializedFileReader::new(file).unwrap();
4323            let metadata = reader.metadata();
4324
4325            let mut row_offset = 0;
4326            for row_group in metadata.row_groups() {
4327                assert_eq!(row_group.num_columns(), 1);
4328                let column = row_group.column(0);
4329
4330                let num_values = column.num_values() as usize;
4331                let src_slice = &src[row_offset..row_offset + num_values];
4332                row_offset += column.num_values() as usize;
4333
4334                let stats = column.statistics().unwrap();
4335                if let Statistics::Int64(stats) = stats {
4336                    assert_eq!(
4337                        *stats.min_opt().unwrap() as u64,
4338                        *src_slice.iter().min().unwrap()
4339                    );
4340                    assert_eq!(
4341                        *stats.max_opt().unwrap() as u64,
4342                        *src_slice.iter().max().unwrap()
4343                    );
4344                } else {
4345                    panic!("Statistics::Int64 missing")
4346                }
4347            }
4348        }
4349    }
4350
4351    #[test]
4352    fn statistics_null_counts_only_nulls() {
4353        // check that null-count statistics for "only NULL"-columns are correct
4354        let values = Arc::new(UInt64Array::from(vec![None, None]));
4355        let files = one_column_roundtrip(values, true);
4356
4357        for file in files {
4358            // check statistics are valid
4359            let reader = SerializedFileReader::new(file).unwrap();
4360            let metadata = reader.metadata();
4361            assert_eq!(metadata.num_row_groups(), 1);
4362            let row_group = metadata.row_group(0);
4363            assert_eq!(row_group.num_columns(), 1);
4364            let column = row_group.column(0);
4365            let stats = column.statistics().unwrap();
4366            assert_eq!(stats.null_count_opt(), Some(2));
4367        }
4368    }
4369
4370    #[test]
4371    fn test_list_of_struct_roundtrip() {
4372        // define schema
4373        let int_field = Field::new("a", DataType::Int32, true);
4374        let int_field2 = Field::new("b", DataType::Int32, true);
4375
4376        let int_builder = Int32Builder::with_capacity(10);
4377        let int_builder2 = Int32Builder::with_capacity(10);
4378
4379        let struct_builder = StructBuilder::new(
4380            vec![int_field, int_field2],
4381            vec![Box::new(int_builder), Box::new(int_builder2)],
4382        );
4383        let mut list_builder = ListBuilder::new(struct_builder);
4384
4385        // Construct the following array
4386        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
4387
4388        // [{a: 1, b: 2}]
4389        let values = list_builder.values();
4390        values
4391            .field_builder::<Int32Builder>(0)
4392            .unwrap()
4393            .append_value(1);
4394        values
4395            .field_builder::<Int32Builder>(1)
4396            .unwrap()
4397            .append_value(2);
4398        values.append(true);
4399        list_builder.append(true);
4400
4401        // []
4402        list_builder.append(true);
4403
4404        // null
4405        list_builder.append(false);
4406
4407        // [null, null]
4408        let values = list_builder.values();
4409        values
4410            .field_builder::<Int32Builder>(0)
4411            .unwrap()
4412            .append_null();
4413        values
4414            .field_builder::<Int32Builder>(1)
4415            .unwrap()
4416            .append_null();
4417        values.append(false);
4418        values
4419            .field_builder::<Int32Builder>(0)
4420            .unwrap()
4421            .append_null();
4422        values
4423            .field_builder::<Int32Builder>(1)
4424            .unwrap()
4425            .append_null();
4426        values.append(false);
4427        list_builder.append(true);
4428
4429        // [{a: null, b: 3}]
4430        let values = list_builder.values();
4431        values
4432            .field_builder::<Int32Builder>(0)
4433            .unwrap()
4434            .append_null();
4435        values
4436            .field_builder::<Int32Builder>(1)
4437            .unwrap()
4438            .append_value(3);
4439        values.append(true);
4440        list_builder.append(true);
4441
4442        // [{a: 2, b: null}]
4443        let values = list_builder.values();
4444        values
4445            .field_builder::<Int32Builder>(0)
4446            .unwrap()
4447            .append_value(2);
4448        values
4449            .field_builder::<Int32Builder>(1)
4450            .unwrap()
4451            .append_null();
4452        values.append(true);
4453        list_builder.append(true);
4454
4455        let array = Arc::new(list_builder.finish());
4456
4457        one_column_roundtrip(array, true);
4458    }
4459
4460    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4461        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4462    }
4463
4464    #[test]
4465    fn test_aggregates_records() {
4466        let arrays = [
4467            Int32Array::from((0..100).collect::<Vec<_>>()),
4468            Int32Array::from((0..50).collect::<Vec<_>>()),
4469            Int32Array::from((200..500).collect::<Vec<_>>()),
4470        ];
4471
4472        let schema = Arc::new(Schema::new(vec![Field::new(
4473            "int",
4474            ArrowDataType::Int32,
4475            false,
4476        )]));
4477
4478        let file = tempfile::tempfile().unwrap();
4479
4480        let props = WriterProperties::builder()
4481            .set_max_row_group_row_count(Some(200))
4482            .build();
4483
4484        let mut writer =
4485            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4486
4487        for array in arrays {
4488            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4489            writer.write(&batch).unwrap();
4490        }
4491
4492        writer.close().unwrap();
4493
4494        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4495        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4496
4497        let batches = builder
4498            .with_batch_size(100)
4499            .build()
4500            .unwrap()
4501            .collect::<ArrowResult<Vec<_>>>()
4502            .unwrap();
4503
4504        assert_eq!(batches.len(), 5);
4505        assert!(batches.iter().all(|x| x.num_columns() == 1));
4506
4507        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4508
4509        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4510
4511        let values: Vec<_> = batches
4512            .iter()
4513            .flat_map(|x| {
4514                x.column(0)
4515                    .as_any()
4516                    .downcast_ref::<Int32Array>()
4517                    .unwrap()
4518                    .values()
4519                    .iter()
4520                    .cloned()
4521            })
4522            .collect();
4523
4524        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4525        assert_eq!(&values, &expected_values)
4526    }
4527
4528    #[test]
4529    fn complex_aggregate() {
4530        // Tests aggregating nested data
4531        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4532        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4533        let struct_a = Arc::new(Field::new(
4534            "struct_a",
4535            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4536            true,
4537        ));
4538
4539        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4540        let struct_b = Arc::new(Field::new(
4541            "struct_b",
4542            DataType::Struct(vec![list_a.clone()].into()),
4543            false,
4544        ));
4545
4546        let schema = Arc::new(Schema::new(vec![struct_b]));
4547
4548        // create nested data
4549        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4550        let field_b_array =
4551            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4552
4553        let struct_a_array = StructArray::from(vec![
4554            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4555            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4556        ]);
4557
4558        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4559            .len(5)
4560            .add_buffer(Buffer::from_iter(vec![
4561                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4562            ]))
4563            .null_bit_buffer(Some(Buffer::from_iter(vec![
4564                true, false, true, false, true,
4565            ])))
4566            .child_data(vec![struct_a_array.into_data()])
4567            .build()
4568            .unwrap();
4569
4570        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4571        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4572
4573        let batch1 =
4574            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4575                .unwrap();
4576
4577        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4578        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4579
4580        let struct_a_array = StructArray::from(vec![
4581            (field_a, Arc::new(field_a_array) as ArrayRef),
4582            (field_b, Arc::new(field_b_array) as ArrayRef),
4583        ]);
4584
4585        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4586            .len(2)
4587            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4588            .child_data(vec![struct_a_array.into_data()])
4589            .build()
4590            .unwrap();
4591
4592        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4593        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4594
4595        let batch2 =
4596            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4597                .unwrap();
4598
4599        let batches = &[batch1, batch2];
4600
4601        // Verify data is as expected
4602
4603        let expected = r#"
4604            +-------------------------------------------------------------------------------------------------------+
4605            | struct_b                                                                                              |
4606            +-------------------------------------------------------------------------------------------------------+
4607            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
4608            | {list: }                                                                                              |
4609            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
4610            | {list: }                                                                                              |
4611            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
4612            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4613            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
4614            +-------------------------------------------------------------------------------------------------------+
4615        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4616
4617        let actual = pretty_format_batches(batches).unwrap().to_string();
4618        assert_eq!(actual, expected);
4619
4620        // Write data
4621        let file = tempfile::tempfile().unwrap();
4622        let props = WriterProperties::builder()
4623            .set_max_row_group_row_count(Some(6))
4624            .build();
4625
4626        let mut writer =
4627            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4628
4629        for batch in batches {
4630            writer.write(batch).unwrap();
4631        }
4632        writer.close().unwrap();
4633
4634        // Read Data
4635        // Should have written entire first batch and first row of second to the first row group
4636        // leaving a single row in the second row group
4637
4638        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4639        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4640
4641        let batches = builder
4642            .with_batch_size(2)
4643            .build()
4644            .unwrap()
4645            .collect::<ArrowResult<Vec<_>>>()
4646            .unwrap();
4647
4648        assert_eq!(batches.len(), 4);
4649        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4650        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4651
4652        let actual = pretty_format_batches(&batches).unwrap().to_string();
4653        assert_eq!(actual, expected);
4654    }
4655
4656    #[test]
4657    fn test_arrow_writer_metadata() {
4658        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4659        let file_schema = batch_schema.clone().with_metadata(
4660            vec![("foo".to_string(), "bar".to_string())]
4661                .into_iter()
4662                .collect(),
4663        );
4664
4665        let batch = RecordBatch::try_new(
4666            Arc::new(batch_schema),
4667            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4668        )
4669        .unwrap();
4670
4671        let mut buf = Vec::with_capacity(1024);
4672        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4673        writer.write(&batch).unwrap();
4674        writer.close().unwrap();
4675    }
4676
4677    #[test]
4678    fn test_arrow_writer_nullable() {
4679        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4680        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4681        let file_schema = Arc::new(file_schema);
4682
4683        let batch = RecordBatch::try_new(
4684            Arc::new(batch_schema),
4685            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4686        )
4687        .unwrap();
4688
4689        let mut buf = Vec::with_capacity(1024);
4690        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4691        writer.write(&batch).unwrap();
4692        writer.close().unwrap();
4693
4694        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4695        let back = read.next().unwrap().unwrap();
4696        assert_eq!(back.schema(), file_schema);
4697        assert_ne!(back.schema(), batch.schema());
4698        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4699    }
4700
4701    #[test]
4702    fn in_progress_accounting() {
4703        // define schema
4704        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4705
4706        // create some data
4707        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4708
4709        // build a record batch
4710        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4711
4712        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4713
4714        // starts empty
4715        assert_eq!(writer.in_progress_size(), 0);
4716        assert_eq!(writer.in_progress_rows(), 0);
4717        assert_eq!(writer.memory_size(), 0);
4718        assert_eq!(writer.bytes_written(), 4); // Initial header
4719        writer.write(&batch).unwrap();
4720
4721        // updated on write
4722        let initial_size = writer.in_progress_size();
4723        assert!(initial_size > 0);
4724        assert_eq!(writer.in_progress_rows(), 5);
4725        let initial_memory = writer.memory_size();
4726        assert!(initial_memory > 0);
4727        // memory estimate is larger than estimated encoded size
4728        assert!(
4729            initial_size <= initial_memory,
4730            "{initial_size} <= {initial_memory}"
4731        );
4732
4733        // updated on second write
4734        writer.write(&batch).unwrap();
4735        assert!(writer.in_progress_size() > initial_size);
4736        assert_eq!(writer.in_progress_rows(), 10);
4737        assert!(writer.memory_size() > initial_memory);
4738        assert!(
4739            writer.in_progress_size() <= writer.memory_size(),
4740            "in_progress_size {} <= memory_size {}",
4741            writer.in_progress_size(),
4742            writer.memory_size()
4743        );
4744
4745        // in progress tracking is cleared, but the overall data written is updated
4746        let pre_flush_bytes_written = writer.bytes_written();
4747        writer.flush().unwrap();
4748        assert_eq!(writer.in_progress_size(), 0);
4749        assert_eq!(writer.memory_size(), 0);
4750        assert!(writer.bytes_written() > pre_flush_bytes_written);
4751
4752        writer.close().unwrap();
4753    }
4754
4755    #[test]
4756    fn test_writer_all_null() {
4757        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4758        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4759        let batch = RecordBatch::try_from_iter(vec![
4760            ("a", Arc::new(a) as ArrayRef),
4761            ("b", Arc::new(b) as ArrayRef),
4762        ])
4763        .unwrap();
4764
4765        let mut buf = Vec::with_capacity(1024);
4766        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4767        writer.write(&batch).unwrap();
4768        writer.close().unwrap();
4769
4770        let bytes = Bytes::from(buf);
4771        let options = ReadOptionsBuilder::new().with_page_index().build();
4772        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4773        let index = reader.metadata().offset_index().unwrap();
4774
4775        assert_eq!(index.len(), 1);
4776        assert_eq!(index[0].len(), 2); // 2 columns
4777        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4778        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4779    }
4780
4781    #[test]
4782    fn test_disabled_statistics_with_page() {
4783        let file_schema = Schema::new(vec![
4784            Field::new("a", DataType::Utf8, true),
4785            Field::new("b", DataType::Utf8, true),
4786        ]);
4787        let file_schema = Arc::new(file_schema);
4788
4789        let batch = RecordBatch::try_new(
4790            file_schema.clone(),
4791            vec![
4792                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4793                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4794            ],
4795        )
4796        .unwrap();
4797
4798        let props = WriterProperties::builder()
4799            .set_statistics_enabled(EnabledStatistics::None)
4800            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4801            .build();
4802
4803        let mut buf = Vec::with_capacity(1024);
4804        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4805        writer.write(&batch).unwrap();
4806
4807        let metadata = writer.close().unwrap();
4808        assert_eq!(metadata.num_row_groups(), 1);
4809        let row_group = metadata.row_group(0);
4810        assert_eq!(row_group.num_columns(), 2);
4811        // Column "a" has both offset and column index, as requested
4812        assert!(row_group.column(0).offset_index_offset().is_some());
4813        assert!(row_group.column(0).column_index_offset().is_some());
4814        // Column "b" should only have offset index
4815        assert!(row_group.column(1).offset_index_offset().is_some());
4816        assert!(row_group.column(1).column_index_offset().is_none());
4817
4818        let options = ReadOptionsBuilder::new().with_page_index().build();
4819        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4820
4821        let row_group = reader.get_row_group(0).unwrap();
4822        let a_col = row_group.metadata().column(0);
4823        let b_col = row_group.metadata().column(1);
4824
4825        // Column chunk of column "a" should have chunk level statistics
4826        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4827            let min = byte_array_stats.min_opt().unwrap();
4828            let max = byte_array_stats.max_opt().unwrap();
4829
4830            assert_eq!(min.as_bytes(), b"a");
4831            assert_eq!(max.as_bytes(), b"d");
4832        } else {
4833            panic!("expecting Statistics::ByteArray");
4834        }
4835
4836        // The column chunk for column "b" shouldn't have statistics
4837        assert!(b_col.statistics().is_none());
4838
4839        let offset_index = reader.metadata().offset_index().unwrap();
4840        assert_eq!(offset_index.len(), 1); // 1 row group
4841        assert_eq!(offset_index[0].len(), 2); // 2 columns
4842
4843        let column_index = reader.metadata().column_index().unwrap();
4844        assert_eq!(column_index.len(), 1); // 1 row group
4845        assert_eq!(column_index[0].len(), 2); // 2 columns
4846
4847        let a_idx = &column_index[0][0];
4848        assert!(
4849            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4850            "{a_idx:?}"
4851        );
4852        let b_idx = &column_index[0][1];
4853        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4854    }
4855
4856    #[test]
4857    fn test_disabled_statistics_with_chunk() {
4858        let file_schema = Schema::new(vec![
4859            Field::new("a", DataType::Utf8, true),
4860            Field::new("b", DataType::Utf8, true),
4861        ]);
4862        let file_schema = Arc::new(file_schema);
4863
4864        let batch = RecordBatch::try_new(
4865            file_schema.clone(),
4866            vec![
4867                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4868                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4869            ],
4870        )
4871        .unwrap();
4872
4873        let props = WriterProperties::builder()
4874            .set_statistics_enabled(EnabledStatistics::None)
4875            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4876            .build();
4877
4878        let mut buf = Vec::with_capacity(1024);
4879        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4880        writer.write(&batch).unwrap();
4881
4882        let metadata = writer.close().unwrap();
4883        assert_eq!(metadata.num_row_groups(), 1);
4884        let row_group = metadata.row_group(0);
4885        assert_eq!(row_group.num_columns(), 2);
4886        // Column "a" should only have offset index
4887        assert!(row_group.column(0).offset_index_offset().is_some());
4888        assert!(row_group.column(0).column_index_offset().is_none());
4889        // Column "b" should only have offset index
4890        assert!(row_group.column(1).offset_index_offset().is_some());
4891        assert!(row_group.column(1).column_index_offset().is_none());
4892
4893        let options = ReadOptionsBuilder::new().with_page_index().build();
4894        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4895
4896        let row_group = reader.get_row_group(0).unwrap();
4897        let a_col = row_group.metadata().column(0);
4898        let b_col = row_group.metadata().column(1);
4899
4900        // Column chunk of column "a" should have chunk level statistics
4901        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4902            let min = byte_array_stats.min_opt().unwrap();
4903            let max = byte_array_stats.max_opt().unwrap();
4904
4905            assert_eq!(min.as_bytes(), b"a");
4906            assert_eq!(max.as_bytes(), b"d");
4907        } else {
4908            panic!("expecting Statistics::ByteArray");
4909        }
4910
4911        // The column chunk for column "b"  shouldn't have statistics
4912        assert!(b_col.statistics().is_none());
4913
4914        let column_index = reader.metadata().column_index().unwrap();
4915        assert_eq!(column_index.len(), 1); // 1 row group
4916        assert_eq!(column_index[0].len(), 2); // 2 columns
4917
4918        let a_idx = &column_index[0][0];
4919        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4920        let b_idx = &column_index[0][1];
4921        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4922    }
4923
4924    #[test]
4925    fn test_arrow_writer_skip_metadata() {
4926        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4927        let file_schema = Arc::new(batch_schema.clone());
4928
4929        let batch = RecordBatch::try_new(
4930            Arc::new(batch_schema),
4931            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4932        )
4933        .unwrap();
4934        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4935
4936        let mut buf = Vec::with_capacity(1024);
4937        let mut writer =
4938            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4939        writer.write(&batch).unwrap();
4940        writer.close().unwrap();
4941
4942        let bytes = Bytes::from(buf);
4943        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4944        assert_eq!(file_schema, *reader_builder.schema());
4945        if let Some(key_value_metadata) = reader_builder
4946            .metadata()
4947            .file_metadata()
4948            .key_value_metadata()
4949        {
4950            assert!(
4951                !key_value_metadata
4952                    .iter()
4953                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4954            );
4955        }
4956    }
4957
4958    #[test]
4959    fn test_arrow_writer_skip_path_in_schema() {
4960        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4961        let file_schema = Arc::new(batch_schema.clone());
4962
4963        let batch = RecordBatch::try_new(
4964            Arc::new(batch_schema),
4965            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4966        )
4967        .unwrap();
4968
4969        // default options should still write path_in_schema
4970        let skip_options = ArrowWriterOptions::new();
4971
4972        let mut buf = Vec::with_capacity(1024);
4973        let mut writer =
4974            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4975        writer.write(&batch).unwrap();
4976        writer.close().unwrap();
4977
4978        // override to not write path_in_schema
4979        let skip_options = ArrowWriterOptions::new().with_properties(
4980            WriterProperties::builder()
4981                .set_write_path_in_schema(false)
4982                .build(),
4983        );
4984
4985        let mut buf2 = Vec::with_capacity(1024);
4986        let mut writer =
4987            ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
4988                .unwrap();
4989        writer.write(&batch).unwrap();
4990        writer.close().unwrap();
4991
4992        // buf2 should be a bit smaller due to lack of path_in_schema
4993        assert!(buf.len() > buf2.len());
4994    }
4995
4996    #[test]
4997    fn mismatched_schemas() {
4998        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4999        let file_schema = Arc::new(Schema::new(vec![Field::new(
5000            "temperature",
5001            DataType::Float64,
5002            false,
5003        )]));
5004
5005        let batch = RecordBatch::try_new(
5006            Arc::new(batch_schema),
5007            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5008        )
5009        .unwrap();
5010
5011        let mut buf = Vec::with_capacity(1024);
5012        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5013
5014        let err = writer.write(&batch).unwrap_err().to_string();
5015        assert_eq!(
5016            err,
5017            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5018        );
5019    }
5020
5021    #[test]
5022    // https://github.com/apache/arrow-rs/issues/6988
5023    fn test_roundtrip_empty_schema() {
5024        // create empty record batch with empty schema
5025        let empty_batch = RecordBatch::try_new_with_options(
5026            Arc::new(Schema::empty()),
5027            vec![],
5028            &RecordBatchOptions::default().with_row_count(Some(0)),
5029        )
5030        .unwrap();
5031
5032        // write to parquet
5033        let mut parquet_bytes: Vec<u8> = Vec::new();
5034        let mut writer =
5035            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5036        writer.write(&empty_batch).unwrap();
5037        writer.close().unwrap();
5038
5039        // read from parquet
5040        let bytes = Bytes::from(parquet_bytes);
5041        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5042        assert_eq!(reader.schema(), &empty_batch.schema());
5043        let batches: Vec<_> = reader
5044            .build()
5045            .unwrap()
5046            .collect::<ArrowResult<Vec<_>>>()
5047            .unwrap();
5048        assert_eq!(batches.len(), 0);
5049    }
5050
5051    #[test]
5052    fn test_page_stats_not_written_by_default() {
5053        let string_field = Field::new("a", DataType::Utf8, false);
5054        let schema = Schema::new(vec![string_field]);
5055        let raw_string_values = vec!["Blart Versenwald III"];
5056        let string_values = StringArray::from(raw_string_values.clone());
5057        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5058
5059        let props = WriterProperties::builder()
5060            .set_statistics_enabled(EnabledStatistics::Page)
5061            .set_dictionary_enabled(false)
5062            .set_encoding(Encoding::PLAIN)
5063            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5064            .build();
5065
5066        let file = roundtrip_opts(&batch, props);
5067
5068        // read file and decode page headers
5069        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5070
5071        // decode first page header
5072        let first_page = &file[4..];
5073        let mut prot = ThriftSliceInputProtocol::new(first_page);
5074        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5075        let stats = hdr.data_page_header.unwrap().statistics;
5076
5077        assert!(stats.is_none());
5078    }
5079
5080    #[test]
5081    fn test_page_stats_when_enabled() {
5082        let string_field = Field::new("a", DataType::Utf8, false);
5083        let schema = Schema::new(vec![string_field]);
5084        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5085        let string_values = StringArray::from(raw_string_values.clone());
5086        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5087
5088        let props = WriterProperties::builder()
5089            .set_statistics_enabled(EnabledStatistics::Page)
5090            .set_dictionary_enabled(false)
5091            .set_encoding(Encoding::PLAIN)
5092            .set_write_page_header_statistics(true)
5093            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5094            .build();
5095
5096        let file = roundtrip_opts(&batch, props);
5097
5098        // read file and decode page headers
5099        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5100
5101        // decode first page header
5102        let first_page = &file[4..];
5103        let mut prot = ThriftSliceInputProtocol::new(first_page);
5104        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5105        let stats = hdr.data_page_header.unwrap().statistics;
5106
5107        let stats = stats.unwrap();
5108        // check that min/max were actually written to the page
5109        assert!(stats.is_max_value_exact.unwrap());
5110        assert!(stats.is_min_value_exact.unwrap());
5111        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5112        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5113    }
5114
5115    #[test]
5116    fn test_page_stats_truncation() {
5117        let string_field = Field::new("a", DataType::Utf8, false);
5118        let binary_field = Field::new("b", DataType::Binary, false);
5119        let schema = Schema::new(vec![string_field, binary_field]);
5120
5121        let raw_string_values = vec!["Blart Versenwald III"];
5122        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5123        let raw_binary_value_refs = raw_binary_values
5124            .iter()
5125            .map(|x| x.as_slice())
5126            .collect::<Vec<_>>();
5127
5128        let string_values = StringArray::from(raw_string_values.clone());
5129        let binary_values = BinaryArray::from(raw_binary_value_refs);
5130        let batch = RecordBatch::try_new(
5131            Arc::new(schema),
5132            vec![Arc::new(string_values), Arc::new(binary_values)],
5133        )
5134        .unwrap();
5135
5136        let props = WriterProperties::builder()
5137            .set_statistics_truncate_length(Some(2))
5138            .set_dictionary_enabled(false)
5139            .set_encoding(Encoding::PLAIN)
5140            .set_write_page_header_statistics(true)
5141            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5142            .build();
5143
5144        let file = roundtrip_opts(&batch, props);
5145
5146        // read file and decode page headers
5147        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5148
5149        // decode first page header
5150        let first_page = &file[4..];
5151        let mut prot = ThriftSliceInputProtocol::new(first_page);
5152        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5153        let stats = hdr.data_page_header.unwrap().statistics;
5154        assert!(stats.is_some());
5155        let stats = stats.unwrap();
5156        // check that min/max were properly truncated
5157        assert!(!stats.is_max_value_exact.unwrap());
5158        assert!(!stats.is_min_value_exact.unwrap());
5159        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5160        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5161
5162        // check second page now
5163        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5164        let mut prot = ThriftSliceInputProtocol::new(second_page);
5165        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5166        let stats = hdr.data_page_header.unwrap().statistics;
5167        assert!(stats.is_some());
5168        let stats = stats.unwrap();
5169        // check that min/max were properly truncated
5170        assert!(!stats.is_max_value_exact.unwrap());
5171        assert!(!stats.is_min_value_exact.unwrap());
5172        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5173        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5174    }
5175
5176    #[test]
5177    fn test_page_encoding_statistics_roundtrip() {
5178        let batch_schema = Schema::new(vec![Field::new(
5179            "int32",
5180            arrow_schema::DataType::Int32,
5181            false,
5182        )]);
5183
5184        let batch = RecordBatch::try_new(
5185            Arc::new(batch_schema.clone()),
5186            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5187        )
5188        .unwrap();
5189
5190        let mut file: File = tempfile::tempfile().unwrap();
5191        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5192        writer.write(&batch).unwrap();
5193        let file_metadata = writer.close().unwrap();
5194
5195        assert_eq!(file_metadata.num_row_groups(), 1);
5196        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5197        assert!(
5198            file_metadata
5199                .row_group(0)
5200                .column(0)
5201                .page_encoding_stats()
5202                .is_some()
5203        );
5204        let chunk_page_stats = file_metadata
5205            .row_group(0)
5206            .column(0)
5207            .page_encoding_stats()
5208            .unwrap();
5209
5210        // check that the read metadata is also correct
5211        let options = ReadOptionsBuilder::new()
5212            .with_page_index()
5213            .with_encoding_stats_as_mask(false)
5214            .build();
5215        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5216
5217        let rowgroup = reader.get_row_group(0).expect("row group missing");
5218        assert_eq!(rowgroup.num_columns(), 1);
5219        let column = rowgroup.metadata().column(0);
5220        assert!(column.page_encoding_stats().is_some());
5221        let file_page_stats = column.page_encoding_stats().unwrap();
5222        assert_eq!(chunk_page_stats, file_page_stats);
5223    }
5224
5225    #[test]
5226    fn test_different_dict_page_size_limit() {
5227        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5228        let schema = Arc::new(Schema::new(vec![
5229            Field::new("col0", arrow_schema::DataType::Int64, false),
5230            Field::new("col1", arrow_schema::DataType::Int64, false),
5231        ]));
5232        let batch =
5233            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5234
5235        let props = WriterProperties::builder()
5236            .set_dictionary_page_size_limit(1024 * 1024)
5237            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5238            .build();
5239        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5240        writer.write(&batch).unwrap();
5241        let data = Bytes::from(writer.into_inner().unwrap());
5242
5243        let mut metadata = ParquetMetaDataReader::new();
5244        metadata.try_parse(&data).unwrap();
5245        let metadata = metadata.finish().unwrap();
5246        let col0_meta = metadata.row_group(0).column(0);
5247        let col1_meta = metadata.row_group(0).column(1);
5248
5249        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5250            let mut reader =
5251                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5252            let page = reader.get_next_page().unwrap().unwrap();
5253            match page {
5254                Page::DictionaryPage { buf, .. } => buf.len(),
5255                _ => panic!("expected DictionaryPage"),
5256            }
5257        };
5258
5259        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5260        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5261    }
5262
5263    #[test]
5264    fn test_arrow_writer_granular_mode_roundtrip() {
5265        // Granular mode subdivides chunks and writes more pages than the
5266        // default batched path. Make sure the data we write back is
5267        // bit-identical to what went in — page-count assertions elsewhere
5268        // only prove pages were cut, not that the encoded data is correct.
5269        //
5270        // Mix value sizes so that the cumulative-byte-budget cutoff
5271        // lands mid-chunk, exercising both batched and granular paths
5272        // within the same `write_batch_internal` call.
5273        let small = "tiny".to_string();
5274        let big = "x".repeat(64 * 1024);
5275        let strings: Vec<String> = (0..256)
5276            .map(|i| {
5277                if i % 16 == 0 {
5278                    big.clone()
5279                } else {
5280                    small.clone()
5281                }
5282            })
5283            .collect();
5284
5285        let schema = Arc::new(Schema::new(vec![Field::new(
5286            "col",
5287            ArrowDataType::Utf8,
5288            false,
5289        )]));
5290        let batch = RecordBatch::try_new(
5291            schema.clone(),
5292            vec![Arc::new(StringArray::from(strings.clone())) as _],
5293        )
5294        .unwrap();
5295
5296        let props = WriterProperties::builder()
5297            .set_dictionary_enabled(false)
5298            .set_data_page_size_limit(16 * 1024)
5299            .build();
5300        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5301        writer.write(&batch).unwrap();
5302        let data = Bytes::from(writer.into_inner().unwrap());
5303
5304        let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5305        let read = reader.next().unwrap().unwrap();
5306        assert!(reader.next().is_none(), "expected one batch");
5307        let col = read
5308            .column(0)
5309            .as_any()
5310            .downcast_ref::<StringArray>()
5311            .unwrap();
5312        assert_eq!(col.len(), strings.len());
5313        for (i, expected) in strings.iter().enumerate() {
5314            assert_eq!(
5315                col.value(i),
5316                expected.as_str(),
5317                "value mismatch at index {i}"
5318            );
5319        }
5320    }
5321
5322    #[test]
5323    fn test_arrow_writer_all_null_string_column() {
5324        // The `LevelDataRef::value_count` Uniform branch with
5325        // `value != max_def` (entirely-null chunk) must return 0 so the
5326        // sub-batch sizer short-circuits to batch mode without trying
5327        // to estimate byte budgets for non-existent values.
5328        let num_rows = 1024;
5329        let schema = Arc::new(Schema::new(vec![Field::new(
5330            "col",
5331            ArrowDataType::Utf8,
5332            true,
5333        )]));
5334        let nulls: Vec<Option<&str>> = vec![None; num_rows];
5335        let batch = RecordBatch::try_new(
5336            schema.clone(),
5337            vec![Arc::new(StringArray::from(nulls)) as _],
5338        )
5339        .unwrap();
5340
5341        let props = WriterProperties::builder()
5342            .set_dictionary_enabled(false)
5343            .set_data_page_size_limit(16 * 1024)
5344            .build();
5345        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5346        writer.write(&batch).unwrap();
5347        let data = Bytes::from(writer.into_inner().unwrap());
5348
5349        // Re-parse the file: row group has one column, every row is
5350        // null, all data pages report `num_rows / page_count` rows.
5351        let mut metadata = ParquetMetaDataReader::new();
5352        metadata.try_parse(&data).unwrap();
5353        let metadata = metadata.finish().unwrap();
5354        let row_group = metadata.row_group(0);
5355        let col_meta = row_group.column(0);
5356        assert_eq!(row_group.num_rows() as usize, num_rows);
5357        // Statistics record `null_count = num_rows` — proves every value
5358        // was written as null.
5359        if let Some(stats) = col_meta.statistics() {
5360            assert_eq!(
5361                stats.null_count_opt().unwrap_or(0) as usize,
5362                num_rows,
5363                "expected all-null column to report null_count = num_rows"
5364            );
5365        }
5366
5367        let mut reader =
5368            SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5369        let mut total_values = 0u32;
5370        while let Some(page) = reader.get_next_page().unwrap() {
5371            if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5372                total_values += page.num_values();
5373            }
5374        }
5375        assert_eq!(
5376            total_values as usize, num_rows,
5377            "expected every level position to be represented in some page"
5378        );
5379    }
5380
5381    struct WriteBatchesShape {
5382        num_batches: usize,
5383        rows_per_batch: usize,
5384        row_size: usize,
5385    }
5386
5387    /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter`
5388    fn write_batches(
5389        WriteBatchesShape {
5390            num_batches,
5391            rows_per_batch,
5392            row_size,
5393        }: WriteBatchesShape,
5394        props: WriterProperties,
5395    ) -> ParquetRecordBatchReaderBuilder<File> {
5396        let schema = Arc::new(Schema::new(vec![Field::new(
5397            "str",
5398            ArrowDataType::Utf8,
5399            false,
5400        )]));
5401        let file = tempfile::tempfile().unwrap();
5402        let mut writer =
5403            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5404
5405        for batch_idx in 0..num_batches {
5406            let strings: Vec<String> = (0..rows_per_batch)
5407                .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5408                .collect();
5409            let array = StringArray::from(strings);
5410            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5411            writer.write(&batch).unwrap();
5412        }
5413        writer.close().unwrap();
5414        ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5415    }
5416
5417    #[test]
5418    // When both limits are None, all data should go into a single row group
5419    fn test_row_group_limit_none_writes_single_row_group() {
5420        let props = WriterProperties::builder()
5421            .set_max_row_group_row_count(None)
5422            .set_max_row_group_bytes(None)
5423            .build();
5424
5425        let builder = write_batches(
5426            WriteBatchesShape {
5427                num_batches: 1,
5428                rows_per_batch: 1000,
5429                row_size: 4,
5430            },
5431            props,
5432        );
5433
5434        assert_eq!(
5435            &row_group_sizes(builder.metadata()),
5436            &[1000],
5437            "With no limits, all rows should be in a single row group"
5438        );
5439    }
5440
5441    #[test]
5442    // When only max_row_group_size is set, respect the row limit
5443    fn test_row_group_limit_rows_only() {
5444        let props = WriterProperties::builder()
5445            .set_max_row_group_row_count(Some(300))
5446            .set_max_row_group_bytes(None)
5447            .build();
5448
5449        let builder = write_batches(
5450            WriteBatchesShape {
5451                num_batches: 1,
5452                rows_per_batch: 1000,
5453                row_size: 4,
5454            },
5455            props,
5456        );
5457
5458        assert_eq!(
5459            &row_group_sizes(builder.metadata()),
5460            &[300, 300, 300, 100],
5461            "Row groups should be split by row count"
5462        );
5463    }
5464
5465    #[test]
5466    // When only max_row_group_bytes is set, respect the byte limit
5467    fn test_row_group_limit_bytes_only() {
5468        let props = WriterProperties::builder()
5469            .set_max_row_group_row_count(None)
5470            // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
5471            .set_max_row_group_bytes(Some(3500))
5472            .build();
5473
5474        let builder = write_batches(
5475            WriteBatchesShape {
5476                num_batches: 10,
5477                rows_per_batch: 10,
5478                row_size: 100,
5479            },
5480            props,
5481        );
5482
5483        let sizes = row_group_sizes(builder.metadata());
5484
5485        assert!(
5486            sizes.len() > 1,
5487            "Should have multiple row groups due to byte limit, got {sizes:?}",
5488        );
5489
5490        let total_rows: i64 = sizes.iter().sum();
5491        assert_eq!(total_rows, 100, "Total rows should be preserved");
5492    }
5493
5494    #[test]
5495    // If an in-progress row group is already oversized, it should be flushed before writing more.
5496    fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5497        let schema = Arc::new(Schema::new(vec![Field::new(
5498            "str",
5499            ArrowDataType::Utf8,
5500            false,
5501        )]));
5502        let file = tempfile::tempfile().unwrap();
5503
5504        // Start with no byte limit so we can intentionally build an oversized in-progress row group.
5505        let props = WriterProperties::builder()
5506            .set_max_row_group_row_count(None)
5507            .set_max_row_group_bytes(None)
5508            .build();
5509        let mut writer =
5510            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5511
5512        let first_array = StringArray::from(
5513            (0..10)
5514                .map(|i| format!("{:0>100}", i))
5515                .collect::<Vec<String>>(),
5516        );
5517        let first_batch =
5518            RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5519        writer.write(&first_batch).unwrap();
5520        assert_eq!(writer.in_progress_rows(), 10);
5521
5522        // Tighten the limit below the current in-progress bytes to exercise:
5523        // `if current_bytes >= max_bytes { self.flush()?; ... }`
5524        writer.max_row_group_bytes = Some(1);
5525
5526        let second_array = StringArray::from(vec!["x".to_string()]);
5527        let second_batch =
5528            RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5529        writer.write(&second_batch).unwrap();
5530        writer.close().unwrap();
5531        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5532
5533        assert_eq!(
5534            &row_group_sizes(builder.metadata()),
5535            &[10, 1],
5536            "The second write should flush an oversized in-progress row group first",
5537        );
5538    }
5539
5540    #[test]
5541    // When both limits are set, the row limit triggers first
5542    fn test_row_group_limit_both_row_wins_single_batch() {
5543        let props = WriterProperties::builder()
5544            .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows
5545            .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data
5546            .build();
5547
5548        let builder = write_batches(
5549            WriteBatchesShape {
5550                num_batches: 1,
5551                row_size: 4,
5552                rows_per_batch: 1000,
5553            },
5554            props,
5555        );
5556
5557        assert_eq!(
5558            &row_group_sizes(builder.metadata()),
5559            &[200, 200, 200, 200, 200],
5560            "Row limit should trigger before byte limit"
5561        );
5562    }
5563
5564    #[test]
5565    // When both limits are set, the row limit triggers first
5566    fn test_row_group_limit_both_row_wins_multiple_batches() {
5567        let props = WriterProperties::builder()
5568            .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows
5569            .set_max_row_group_bytes(Some(9999)) // Won't trigger
5570            .build();
5571
5572        let builder = write_batches(
5573            WriteBatchesShape {
5574                num_batches: 10,
5575                rows_per_batch: 10,
5576                row_size: 100,
5577            },
5578            props,
5579        );
5580
5581        assert_eq!(
5582            &row_group_sizes(builder.metadata()),
5583            &[5; 20],
5584            "Row limit should trigger before byte limit"
5585        );
5586    }
5587
5588    #[test]
5589    // When both limits are set, the byte limit triggers first
5590    fn test_row_group_limit_both_bytes_wins() {
5591        let props = WriterProperties::builder()
5592            .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows
5593            .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows
5594            .build();
5595
5596        let builder = write_batches(
5597            WriteBatchesShape {
5598                num_batches: 10,
5599                rows_per_batch: 10,
5600                row_size: 100,
5601            },
5602            props,
5603        );
5604
5605        let sizes = row_group_sizes(builder.metadata());
5606
5607        assert!(
5608            sizes.len() > 1,
5609            "Byte limit should trigger before row limit, got {sizes:?}",
5610        );
5611
5612        assert!(
5613            sizes.iter().all(|&s| s < 1000),
5614            "No row group should hit the row limit"
5615        );
5616
5617        let total_rows: i64 = sizes.iter().sum();
5618        assert_eq!(total_rows, 100, "Total rows should be preserved");
5619    }
5620
5621    #[test]
5622    fn arrow_column_chunk_close_mut_drops_column_index() {
5623        use crate::arrow::ArrowSchemaConverter;
5624        use crate::file::writer::SerializedFileWriter;
5625
5626        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5627        let props = Arc::new(
5628            WriterProperties::builder()
5629                .set_statistics_enabled(EnabledStatistics::Page)
5630                .build(),
5631        );
5632        let parquet_schema = ArrowSchemaConverter::new()
5633            .with_coerce_types(props.coerce_types())
5634            .convert(&schema)
5635            .unwrap();
5636
5637        let mut buf = Vec::with_capacity(1024);
5638        let mut writer =
5639            SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5640                .unwrap();
5641
5642        let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5643        let mut col_writers = factory.create_column_writers(0).unwrap();
5644        let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5645        for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5646            col_writers[0].write(&leaves).unwrap();
5647        }
5648        let mut chunk = col_writers.pop().unwrap().close().unwrap();
5649
5650        // Immutable accessor exposes the close result produced at close time.
5651        assert!(
5652            chunk.close().column_index.is_some(),
5653            "EnabledStatistics::Page should produce a column_index"
5654        );
5655
5656        // Mutable accessor lets callers drop the page-level index before append.
5657        chunk.close_mut().column_index = None;
5658        assert!(chunk.close().column_index.is_none());
5659
5660        let mut rg = writer.next_row_group().unwrap();
5661        chunk.append_to_row_group(&mut rg).unwrap();
5662        rg.close().unwrap();
5663        let file_meta = writer.close().unwrap();
5664
5665        // After dropping column_index, the resulting file records no column
5666        // index offset/length for this chunk.
5667        let cc = file_meta.row_group(0).column(0);
5668        assert!(cc.column_index_range().is_none());
5669    }
5670
5671    /// Writes a single-column RecordBatch to an in-memory Parquet buffer.
5672    fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5673        let schema = Arc::new(Schema::new(vec![Field::new(
5674            "col",
5675            array.data_type().clone(),
5676            true,
5677        )]));
5678        let buf = get_bytes_after_close(
5679            schema.clone(),
5680            &RecordBatch::try_new(schema, vec![array]).unwrap(),
5681        );
5682        Bytes::from(buf)
5683    }
5684
5685    /// Reads column 0 from a single-row-group Parquet buffer, projecting it with the given schema.
5686    /// Passing a flat schema when the buffer was written from a REE array lets callers decode
5687    /// the physical values without the run-end encoding wrapper.
5688    fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5689        let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5690        ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5691            .unwrap()
5692            .build()
5693            .unwrap()
5694            .next()
5695            .unwrap()
5696            .unwrap()
5697            .column(0)
5698            .clone()
5699    }
5700
5701    fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5702        let flat_schema = Arc::new(Schema::new(vec![Field::new(
5703            "col",
5704            flat.data_type().clone(),
5705            true,
5706        )]));
5707        let ree_bytes = write_column_to_bytes(ree);
5708        let flat_bytes = write_column_to_bytes(flat.clone());
5709        assert_eq!(
5710            ree_bytes, flat_bytes,
5711            "REE and flat bytes should be identical"
5712        );
5713
5714        let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5715        let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5716
5717        assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5718        assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5719    }
5720
5721    #[test]
5722    fn ree_string() {
5723        let ree: ArrayRef = Arc::new(
5724            [Some("a"), Some("a"), None, Some("b"), Some("b")]
5725                .into_iter()
5726                .collect::<Int32RunArray>(),
5727        );
5728        let flat: ArrayRef = Arc::new(StringArray::from(vec![
5729            Some("a"),
5730            Some("a"),
5731            None,
5732            Some("b"),
5733            Some("b"),
5734        ]));
5735        ree_write_read_roundtrip(ree, flat);
5736    }
5737
5738    #[test]
5739    fn ree_int32() {
5740        let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5741        for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5742            b.append_option(v);
5743        }
5744        let ree: ArrayRef = Arc::new(b.finish());
5745        let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5746            Some(1),
5747            Some(1),
5748            None,
5749            Some(2),
5750            Some(2),
5751        ]));
5752        ree_write_read_roundtrip(ree, flat);
5753    }
5754
5755    #[test]
5756    fn ree_bool() {
5757        // run_ends [3, 5, 7] → [T,T,T, null,null, F,F]
5758        let ree: ArrayRef = Arc::new(
5759            RunArray::try_new(
5760                &Int32Array::from(vec![3, 5, 7]),
5761                &BooleanArray::from(vec![Some(true), None, Some(false)]),
5762            )
5763            .unwrap(),
5764        );
5765        let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5766            Some(true),
5767            Some(true),
5768            Some(true),
5769            None,
5770            None,
5771            Some(false),
5772            Some(false),
5773        ]));
5774        ree_write_read_roundtrip(ree, flat);
5775    }
5776
5777    #[test]
5778    fn ree_fixed_size_binary() {
5779        let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5780            let mut b = FixedSizeBinaryBuilder::new(2);
5781            for v in vals {
5782                match v {
5783                    Some(x) => b.append_value(x).unwrap(),
5784                    None => b.append_null(),
5785                }
5786            }
5787            b.finish()
5788        };
5789        // run_ends [2, 4, 6] → [aa,aa, null,null, bb,bb]
5790        let ree: ArrayRef = Arc::new(
5791            RunArray::try_new(
5792                &Int32Array::from(vec![2, 4, 6]),
5793                &mk(&[Some(b"aa"), None, Some(b"bb")]),
5794            )
5795            .unwrap(),
5796        );
5797        let flat: ArrayRef = Arc::new(mk(&[
5798            Some(b"aa"),
5799            Some(b"aa"),
5800            None,
5801            None,
5802            Some(b"bb"),
5803            Some(b"bb"),
5804        ]));
5805        ree_write_read_roundtrip(ree, flat);
5806    }
5807
5808    #[test]
5809    fn ree_single_run() {
5810        let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5811        let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5812        ree_write_read_roundtrip(ree, flat);
5813    }
5814
5815    #[test]
5816    fn ree_float32() {
5817        // run_ends [2, 4, 5] → [1.0, 1.0, null, null, 2.5]
5818        let ree: ArrayRef = Arc::new(
5819            RunArray::try_new(
5820                &Int32Array::from(vec![2, 4, 5]),
5821                &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5822            )
5823            .unwrap(),
5824        );
5825        let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5826            Some(1.0_f32),
5827            Some(1.0_f32),
5828            None,
5829            None,
5830            Some(2.5_f32),
5831        ]));
5832        ree_write_read_roundtrip(ree, flat);
5833    }
5834
5835    #[test]
5836    fn ree_sliced() {
5837        // A sliced (non-zero offset) REE array: verify that get_physical_index
5838        // correctly accounts for the logical offset when expanding.
5839        // Full array: run_ends [3, 5, 7] → [a,a,a, b,b, c,c]
5840        // After slice(2, 5) the logical view is [a, b, b, c, c].
5841        let full: ArrayRef = Arc::new(
5842            RunArray::try_new(
5843                &Int32Array::from(vec![3, 5, 7]),
5844                &StringArray::from(vec!["a", "b", "c"]),
5845            )
5846            .unwrap(),
5847        );
5848        let sliced = full.slice(2, 5);
5849        let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5850        ree_write_read_roundtrip(sliced, flat);
5851    }
5852
5853    #[test]
5854    fn ree_struct_with_ree_child() {
5855        // Struct with a REE string field and a REE int field — confirms
5856        // recursion visits every child and each collapses to the right leaf type.
5857        let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5858
5859        let col_a: ArrayRef = Arc::new(
5860            RunArray::try_new(
5861                &run_ends,
5862                &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5863            )
5864            .unwrap(),
5865        );
5866        let col_b: ArrayRef = Arc::new(
5867            RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5868        );
5869
5870        let struct_array: ArrayRef = Arc::new(StructArray::new(
5871            Fields::from(vec![
5872                Field::new("a", col_a.data_type().clone(), true),
5873                Field::new("b", col_b.data_type().clone(), true),
5874            ]),
5875            vec![col_a, col_b],
5876            None,
5877        ));
5878
5879        let schema = Arc::new(Schema::new(vec![Field::new(
5880            "row",
5881            struct_array.data_type().clone(),
5882            true,
5883        )]));
5884        let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5885
5886        let mut buf = Vec::new();
5887        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
5888        writer.write(&batch).unwrap();
5889        let metadata = writer.close().unwrap();
5890
5891        let parquet_schema = metadata.file_metadata().schema_descr();
5892        assert_eq!(parquet_schema.num_columns(), 2);
5893        assert_eq!(
5894            parquet_schema.column(0).physical_type(),
5895            crate::basic::Type::BYTE_ARRAY
5896        );
5897        assert_eq!(parquet_schema.column(0).path().string(), "row.a");
5898        assert_eq!(
5899            parquet_schema.column(1).physical_type(),
5900            crate::basic::Type::INT32
5901        );
5902        assert_eq!(parquet_schema.column(1).path().string(), "row.b");
5903    }
5904}