Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62    InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63    PageStoreFactory,
64};
65
66/// Encodes [`RecordBatch`] to parquet
67///
68/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
69/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
70/// flushed on close, leading the final row group in the output file to potentially
71/// contain fewer than `max_row_group_size` rows
72///
73/// # Example: Writing `RecordBatch`es
74/// ```
75/// # use std::sync::Arc;
76/// # use bytes::Bytes;
77/// # use arrow_array::{ArrayRef, Int64Array};
78/// # use arrow_array::RecordBatch;
79/// # use parquet::arrow::arrow_writer::ArrowWriter;
80/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
81/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
82/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
83///
84/// let mut buffer = Vec::new();
85/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
86/// writer.write(&to_write).unwrap();
87/// writer.close().unwrap();
88///
89/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
90/// let read = reader.next().unwrap().unwrap();
91///
92/// assert_eq!(to_write, read);
93/// ```
94///
95/// # Memory Usage and Limiting
96///
97/// The nature of Parquet requires buffering of an entire row group before it can
98/// be flushed to the underlying writer. Data is mostly buffered in its encoded
99/// form, reducing memory usage. However, some data such as dictionary keys,
100/// large strings or very nested data may still result in non-trivial memory
101/// usage.
102///
103/// See Also:
104/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
105/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
106///
107/// Call [`Self::flush`] to trigger an early flush of a row group based on a
108/// memory threshold and/or global memory pressure. However,  smaller row groups
109/// result in higher metadata overheads, and thus may worsen compression ratios
110/// and query performance.
111///
112/// ```no_run
113/// # use std::io::Write;
114/// # use arrow_array::RecordBatch;
115/// # use parquet::arrow::ArrowWriter;
116/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
117/// # let batch: RecordBatch = todo!();
118/// writer.write(&batch).unwrap();
119/// // Trigger an early flush if anticipated size exceeds 1_000_000
120/// if writer.in_progress_size() > 1_000_000 {
121///     writer.flush().unwrap();
122/// }
123/// ```
124///
125/// ## Type Support
126///
127/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
128/// Parquet types including  [`StructArray`] and [`ListArray`].
129///
130/// The following are not supported:
131///
132/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
133///
134/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
135/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
136/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
137/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
138/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
139///
140/// ## Type Compatibility
141/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
142/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
143/// value type.
144///
145/// For example, the following [`DataType`]s are all logically equivalent and can be written
146/// to the same column:
147/// * String, LargeString, StringView
148/// * Binary, LargeBinary, BinaryView
149///
150/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
151/// contain compatible values.
152/// ```
153/// # use std::sync::Arc;
154/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
155/// # use arrow_schema::{DataType, Field, Schema};
156/// # use parquet::arrow::arrow_writer::ArrowWriter;
157/// let record_batch1 = RecordBatch::try_new(
158///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
159///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
160///  )
161/// .unwrap();
162///
163/// let mut buffer = Vec::new();
164/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
165/// writer.write(&record_batch1).unwrap();
166///
167/// let record_batch2 = RecordBatch::try_new(
168///     Arc::new(Schema::new(vec![Field::new(
169///         "col",
170///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
171///          false,
172///     )])),
173///     vec![Arc::new(DictionaryArray::new(
174///          UInt8Array::from_iter_values(vec![0, 1]),
175///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
176///      ))],
177///  )
178///  .unwrap();
179///  writer.write(&record_batch2).unwrap();
180///  writer.close();
181/// ```
182pub struct ArrowWriter<W: Write> {
183    /// Underlying Parquet writer
184    writer: SerializedFileWriter<W>,
185
186    /// The in-progress row group if any
187    in_progress: Option<ArrowRowGroupWriter>,
188
189    /// A copy of the Arrow schema.
190    ///
191    /// The schema is used to verify that each record batch written has the correct schema
192    arrow_schema: SchemaRef,
193
194    /// Creates new [`ArrowRowGroupWriter`] instances as required
195    row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197    /// The maximum number of rows to write to each row group, or None for unlimited
198    max_row_group_row_count: Option<usize>,
199
200    /// The maximum size in bytes for a row group, or None for unlimited
201    max_row_group_bytes: Option<usize>,
202
203    /// CDC chunkers persisted across row groups (one per leaf column).
204    cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        let buffered_memory = self.in_progress_size();
210        f.debug_struct("ArrowWriter")
211            .field("writer", &self.writer)
212            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213            .field("in_progress_rows", &self.in_progress_rows())
214            .field("arrow_schema", &self.arrow_schema)
215            .field("max_row_group_row_count", &self.max_row_group_row_count)
216            .field("max_row_group_bytes", &self.max_row_group_bytes)
217            .finish()
218    }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222    /// Try to create a new Arrow writer
223    ///
224    /// The writer will fail if:
225    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
226    ///  * the Arrow schema contains unsupported datatypes such as Unions
227    pub fn try_new(
228        writer: W,
229        arrow_schema: SchemaRef,
230        props: Option<WriterProperties>,
231    ) -> Result<Self> {
232        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233        Self::try_new_with_options(writer, arrow_schema, options)
234    }
235
236    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
237    ///
238    /// The writer will fail if:
239    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
240    ///  * the Arrow schema contains unsupported datatypes such as Unions
241    pub fn try_new_with_options(
242        writer: W,
243        arrow_schema: SchemaRef,
244        options: ArrowWriterOptions,
245    ) -> Result<Self> {
246        let mut props = options.properties;
247
248        let schema = if let Some(parquet_schema) = options.schema_descr {
249            parquet_schema.clone()
250        } else {
251            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252            if let Some(schema_root) = &options.schema_root {
253                converter = converter.schema_root(schema_root);
254            }
255
256            converter.convert(&arrow_schema)?
257        };
258
259        if !options.skip_arrow_metadata {
260            // add serialized arrow schema
261            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262        }
263
264        let max_row_group_row_count = props.max_row_group_row_count();
265        let max_row_group_bytes = props.max_row_group_bytes();
266
267        let props_ptr = Arc::new(props);
268        let file_writer =
269            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271        let mut row_group_writer_factory =
272            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273        if let Some(page_store_factory) = options.page_store_factory {
274            row_group_writer_factory =
275                row_group_writer_factory.with_page_store_factory(page_store_factory);
276        }
277
278        let cdc_chunkers = props_ptr
279            .content_defined_chunking()
280            .map(|opts| {
281                file_writer
282                    .schema_descr()
283                    .columns()
284                    .iter()
285                    .map(|desc| ContentDefinedChunker::new(desc, opts))
286                    .collect::<Result<Vec<_>>>()
287            })
288            .transpose()?;
289
290        Ok(Self {
291            writer: file_writer,
292            in_progress: None,
293            arrow_schema,
294            row_group_writer_factory,
295            max_row_group_row_count,
296            max_row_group_bytes,
297            cdc_chunkers,
298        })
299    }
300
301    /// Returns metadata for any flushed row groups
302    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303        self.writer.flushed_row_groups()
304    }
305
306    /// Estimated memory usage, in bytes, of this `ArrowWriter`
307    ///
308    /// This estimate is formed bu summing the values of
309    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
310    pub fn memory_size(&self) -> usize {
311        match &self.in_progress {
312            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313            None => 0,
314        }
315    }
316
317    /// Anticipated encoded size of the in progress row group.
318    ///
319    /// This estimate the row group size after being completely encoded is,
320    /// formed by summing the values of
321    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
322    /// columns.
323    pub fn in_progress_size(&self) -> usize {
324        match &self.in_progress {
325            Some(in_progress) => in_progress
326                .writers
327                .iter()
328                .map(|x| x.get_estimated_total_bytes())
329                .sum(),
330            None => 0,
331        }
332    }
333
334    /// Returns the number of rows buffered in the in progress row group
335    pub fn in_progress_rows(&self) -> usize {
336        self.in_progress
337            .as_ref()
338            .map(|x| x.buffered_rows)
339            .unwrap_or_default()
340    }
341
342    /// Returns the number of bytes written by this instance
343    pub fn bytes_written(&self) -> usize {
344        self.writer.bytes_written()
345    }
346
347    /// Encodes the provided [`RecordBatch`]
348    ///
349    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]
350    /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be
351    /// written to one or more row groups such that limits are respected.
352    ///
353    /// If both limits are `None`, all data is written to a single row group.
354    /// If one limit is set, that limit is respected.
355    /// If both limits are set, the lower bound (whichever triggers first) is respected.
356    ///
357    /// This will fail if the `batch`'s schema does not match the writer's schema.
358    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359        if batch.num_rows() == 0 {
360            return Ok(());
361        }
362
363        let in_progress = match &mut self.in_progress {
364            Some(in_progress) => in_progress,
365            x => x.insert(
366                self.row_group_writer_factory
367                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368            ),
369        };
370
371        if let Some(max_rows) = self.max_row_group_row_count {
372            if in_progress.buffered_rows + batch.num_rows() > max_rows {
373                let to_write = max_rows - in_progress.buffered_rows;
374                let a = batch.slice(0, to_write);
375                let b = batch.slice(to_write, batch.num_rows() - to_write);
376                self.write(&a)?;
377                return self.write(&b);
378            }
379        }
380
381        // Check byte limit: if we have buffered data, use measured average row size
382        // to split batch proactively before exceeding byte limit
383        if let Some(max_bytes) = self.max_row_group_bytes {
384            if in_progress.buffered_rows > 0 {
385                let current_bytes = in_progress.get_estimated_total_bytes();
386
387                if current_bytes >= max_bytes {
388                    self.flush()?;
389                    return self.write(batch);
390                }
391
392                let avg_row_bytes = current_bytes / in_progress.buffered_rows;
393                if avg_row_bytes > 0 {
394                    // At this point, `current_bytes < max_bytes` (checked above)
395                    let remaining_bytes = max_bytes - current_bytes;
396                    let rows_that_fit = remaining_bytes / avg_row_bytes;
397
398                    if batch.num_rows() > rows_that_fit {
399                        if rows_that_fit > 0 {
400                            let a = batch.slice(0, rows_that_fit);
401                            let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
402                            self.write(&a)?;
403                            return self.write(&b);
404                        } else {
405                            self.flush()?;
406                            return self.write(batch);
407                        }
408                    }
409                }
410            }
411        }
412
413        match self.cdc_chunkers.as_mut() {
414            Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
415            None => in_progress.write(batch)?,
416        }
417
418        let should_flush = self
419            .max_row_group_row_count
420            .is_some_and(|max| in_progress.buffered_rows >= max)
421            || self
422                .max_row_group_bytes
423                .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
424
425        if should_flush {
426            self.flush()?
427        }
428        Ok(())
429    }
430
431    /// Writes the given buf bytes to the internal buffer.
432    ///
433    /// It's safe to use this method to write data to the underlying writer,
434    /// because it will ensure that the buffering and byte‐counting layers are used.
435    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
436        self.writer.write_all(buf)
437    }
438
439    /// Flushes underlying writer
440    pub fn sync(&mut self) -> std::io::Result<()> {
441        self.writer.flush()
442    }
443
444    /// Flushes all buffered rows into a new row group
445    ///
446    /// Note the underlying writer is not flushed with this call.
447    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
448    pub fn flush(&mut self) -> Result<()> {
449        let in_progress = match self.in_progress.take() {
450            Some(in_progress) => in_progress,
451            None => return Ok(()),
452        };
453
454        let mut row_group_writer = self.writer.next_row_group()?;
455        for chunk in in_progress.close()? {
456            chunk.append_to_row_group(&mut row_group_writer)?;
457        }
458        row_group_writer.close()?;
459        Ok(())
460    }
461
462    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
463    ///
464    /// This method provide a way to append kv_metadata after write RecordBatch
465    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
466        self.writer.append_key_value_metadata(kv_metadata)
467    }
468
469    /// Returns a reference to the underlying writer.
470    pub fn inner(&self) -> &W {
471        self.writer.inner()
472    }
473
474    /// Returns a mutable reference to the underlying writer.
475    ///
476    /// **Warning**: if you write directly to this writer, you will skip
477    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
478    /// the file footer’s recorded offsets and sizes to diverge from reality,
479    /// resulting in an unreadable or corrupted Parquet file.
480    ///
481    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
482    pub fn inner_mut(&mut self) -> &mut W {
483        self.writer.inner_mut()
484    }
485
486    /// Flushes any outstanding data and returns the underlying writer.
487    pub fn into_inner(mut self) -> Result<W> {
488        self.flush()?;
489        self.writer.into_inner()
490    }
491
492    /// Close and finalize the underlying Parquet writer
493    ///
494    /// Unlike [`Self::close`] this does not consume self
495    ///
496    /// Attempting to write after calling finish will result in an error
497    pub fn finish(&mut self) -> Result<ParquetMetaData> {
498        self.flush()?;
499        self.writer.finish()
500    }
501
502    /// Close and finalize the underlying Parquet writer
503    pub fn close(mut self) -> Result<ParquetMetaData> {
504        self.finish()
505    }
506
507    /// Create a new row group writer and return its column writers.
508    #[deprecated(
509        since = "56.2.0",
510        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
511    )]
512    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
513        self.flush()?;
514        let in_progress = self
515            .row_group_writer_factory
516            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
517        Ok(in_progress.writers)
518    }
519
520    /// Append the given column chunks to the file as a new row group.
521    #[deprecated(
522        since = "56.2.0",
523        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
524    )]
525    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
526        let mut row_group_writer = self.writer.next_row_group()?;
527        for chunk in chunks {
528            chunk.append_to_row_group(&mut row_group_writer)?;
529        }
530        row_group_writer.close()?;
531        Ok(())
532    }
533
534    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
535    ///
536    /// Flushes any outstanding data before returning.
537    ///
538    /// This can be useful to provide more control over how files are written, for example
539    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
540    pub fn into_serialized_writer(
541        mut self,
542    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
543        self.flush()?;
544        Ok((self.writer, self.row_group_writer_factory))
545    }
546}
547
548impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
549    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
550        self.write(batch).map_err(|e| e.into())
551    }
552
553    fn close(self) -> std::result::Result<(), ArrowError> {
554        self.close()?;
555        Ok(())
556    }
557}
558
559/// Arrow-specific configuration settings for writing parquet files.
560///
561/// See [`ArrowWriter`] for how to configure the writer.
562#[derive(Debug, Clone, Default)]
563pub struct ArrowWriterOptions {
564    properties: WriterProperties,
565    skip_arrow_metadata: bool,
566    schema_root: Option<String>,
567    schema_descr: Option<SchemaDescriptor>,
568    page_store_factory: Option<Arc<dyn PageStoreFactory>>,
569}
570
571impl ArrowWriterOptions {
572    /// Creates a new [`ArrowWriterOptions`] with the default settings.
573    pub fn new() -> Self {
574        Self::default()
575    }
576
577    /// Sets the [`WriterProperties`] for writing parquet files.
578    pub fn with_properties(self, properties: WriterProperties) -> Self {
579        Self { properties, ..self }
580    }
581
582    /// Sets the [`PageStoreFactory`] used to buffer completed pages while a row
583    /// group is being written.
584    ///
585    /// By default (an [`InMemoryPageStore`] per column chunk) completed pages
586    /// are buffered on the heap until the row group is flushed, so peak memory
587    /// grows with the row group size. Supplying a factory that spills to a temp
588    /// file or object storage instead bounds peak write memory, decoupling it
589    /// from the row group size while keeping large, read-optimal row groups.
590    ///
591    /// # Example: a custom [`PageStore`]
592    ///
593    /// A store only has to map an opaque, store-allocated [`PageKey`] to a blob
594    /// and hand the blob back once. The keys need not be dense or sequential —
595    /// here a `HashMap`-backed store mints sparse handles, proving the writer
596    /// relies only on the opaque-handle contract. A real spilling backend would
597    /// write the bytes to a temp file in `put` and read them back in `take`.
598    ///
599    /// ```
600    /// # use std::collections::HashMap;
601    /// # use std::sync::Arc;
602    /// # use bytes::Bytes;
603    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
604    /// # use parquet::arrow::arrow_writer::{
605    /// #     ArrowWriter, ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory,
606    /// # };
607    /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
608    /// # use parquet::errors::{ParquetError, Result};
609    /// #[derive(Default)]
610    /// struct MapPageStore {
611    ///     blobs: HashMap<u64, Bytes>,
612    ///     next: u64,
613    /// }
614    ///
615    /// impl PageStore for MapPageStore {
616    ///     fn put(&mut self, value: Bytes) -> Result<PageKey> {
617    ///         // Mint a sparse handle (every other integer) to show the writer
618    ///         // never assumes anything about the key's value.
619    ///         let key = PageKey::new(self.next);
620    ///         self.next += 2;
621    ///         self.blobs.insert(key.get(), value);
622    ///         Ok(key)
623    ///     }
624    ///
625    ///     fn take(&mut self, key: PageKey) -> Result<Bytes> {
626    ///         self.blobs
627    ///             .remove(&key.get())
628    ///             .ok_or_else(|| ParquetError::General(format!("invalid key {}", key.get())))
629    ///     }
630    /// }
631    ///
632    /// #[derive(Debug)]
633    /// struct MapPageStoreFactory;
634    ///
635    /// impl PageStoreFactory for MapPageStoreFactory {
636    ///     fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
637    ///         // `args` exposes the column index and descriptor (physical/logical
638    ///         // type, path), so a real backend could spill only large columns.
639    ///         let _ = (args.column_index(), args.column_descriptor());
640    ///         Ok(Box::new(MapPageStore::default()))
641    ///     }
642    /// }
643    ///
644    /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef;
645    /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
646    ///
647    /// let options =
648    ///     ArrowWriterOptions::new().with_page_store_factory(Arc::new(MapPageStoreFactory));
649    /// let mut buffer = Vec::new();
650    /// let mut writer =
651    ///     ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), options).unwrap();
652    /// writer.write(&to_write).unwrap();
653    /// writer.close().unwrap();
654    ///
655    /// // The file is byte-identical to one written with the default store.
656    /// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
657    /// assert_eq!(to_write, reader.next().unwrap().unwrap());
658    /// ```
659    pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
660        Self {
661            page_store_factory: Some(page_store_factory),
662            ..self
663        }
664    }
665
666    /// Skip encoding the embedded arrow metadata (defaults to `false`)
667    ///
668    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
669    /// by default.
670    ///
671    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
672    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
673        Self {
674            skip_arrow_metadata,
675            ..self
676        }
677    }
678
679    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
680    pub fn with_schema_root(self, schema_root: String) -> Self {
681        Self {
682            schema_root: Some(schema_root),
683            ..self
684        }
685    }
686
687    /// Explicitly specify the Parquet schema to be used
688    ///
689    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
690    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
691    /// already known or must be calculated using custom logic.
692    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
693        Self {
694            schema_descr: Some(schema_descr),
695            ..self
696        }
697    }
698}
699
700/// A single column chunk produced by [`ArrowColumnWriter`].
701///
702/// Holds the serialized page blobs (each page's header ‖ compressed data, in
703/// write order) in a [`PageStore`], plus the handles needed to read them back,
704/// in order, when the chunk is spliced into the output file.
705struct ArrowColumnChunkData {
706    length: usize,
707    store: Box<dyn PageStore>,
708    keys: Vec<PageKey>,
709    /// Handles to the dictionary page's blobs (header then data) in the store.
710    ///
711    /// A dictionary page is produced at most once and bounded by
712    /// `dict_page_size_limit`, but it must be written *first* in the chunk even
713    /// though the data pages reach the writer before it (see
714    /// [`PageWriter::defers_dictionary_ordering`]). Its header and data are `put`
715    /// into the store like any other page — which keeps the store uniform, and
716    /// lets an oversized dictionary page spill — and their handles are held apart
717    /// so they can be emitted ahead of the data pages at splice.
718    /// Empty for non-dictionary columns.
719    dictionary_keys: Vec<PageKey>,
720    /// Serialized length of the dictionary page (0 if there is none), recorded
721    /// so the data pages can be shifted past it when offsets are rewritten to a
722    /// dictionary-first layout at splice.
723    dictionary_len: usize,
724}
725
726impl ArrowColumnChunkData {
727    fn new(store: Box<dyn PageStore>) -> Self {
728        Self {
729            length: 0,
730            store,
731            keys: Vec::new(),
732            dictionary_keys: Vec::new(),
733            dictionary_len: 0,
734        }
735    }
736
737    /// Append a data-page blob to the store, recording its handle in write
738    /// order.
739    fn push(&mut self, value: Bytes) -> Result<()> {
740        let key = self.store.put(value)?;
741        self.keys.push(key);
742        Ok(())
743    }
744
745    /// Store a dictionary-page blob (header or data) in the page store,
746    /// recording its handle (emitted first at splice) and accumulating its
747    /// serialized length.
748    fn push_dictionary(&mut self, value: Bytes) -> Result<()> {
749        self.dictionary_len += value.len();
750        let key = self.store.put(value)?;
751        self.dictionary_keys.push(key);
752        Ok(())
753    }
754
755    /// Bytes this chunk currently holds on the heap: whatever the store keeps
756    /// resident (zero for a spilling backend).
757    fn memory_size(&self) -> usize {
758        self.store.memory_size()
759    }
760}
761
762/// A streaming [`Read`] over one column chunk's buffered pages, in final file
763/// order: the dictionary page (if any) first, then the data pages.
764///
765/// Each blob is taken back out of the [`PageStore`] *as it is
766/// consumed* and released immediately afterwards, so splicing a chunk into the
767/// output file never materializes more than a single page in memory at a time.
768/// This is what keeps the splice phase within the memory bound for a spilling
769/// backend (an in-memory store already holds the bytes, so it is unaffected).
770struct StreamingColumnChunkReader {
771    store: Box<dyn PageStore>,
772    /// Page handles in final file order: the dictionary page first (if any),
773    /// then the data pages.
774    keys: IntoIter<PageKey>,
775    /// The blob currently being drained into the output; emptied as it is read.
776    current: Bytes,
777}
778
779impl StreamingColumnChunkReader {
780    fn new(data: ArrowColumnChunkData) -> Self {
781        // The dictionary page must be emitted first, ahead of the data pages,
782        // even though it was the last page produced.
783        let keys = if data.dictionary_keys.is_empty() {
784            data.keys
785        } else {
786            let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len());
787            keys.extend(data.dictionary_keys);
788            keys.extend(data.keys);
789            keys
790        };
791        Self {
792            store: data.store,
793            keys: keys.into_iter(),
794            current: Bytes::new(),
795        }
796    }
797}
798
799impl Read for StreamingColumnChunkReader {
800    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
801        // Refill from the next blob whenever the current one is drained: the
802        // dictionary page first, then each data page, all taken from the store.
803        while self.current.is_empty() {
804            if let Some(key) = self.keys.next() {
805                self.current = self.store.take(key).map_err(std::io::Error::other)?;
806            } else {
807                return Ok(0);
808            }
809        }
810
811        let len = self.current.len().min(out.len());
812        let b = self.current.split_to(len);
813        out[..len].copy_from_slice(&b);
814        Ok(len)
815    }
816}
817
818/// A shared [`ArrowColumnChunkData`]
819///
820/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
821/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
822type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
823
824struct ArrowPageWriter {
825    buffer: SharedColumnChunk,
826    #[cfg(feature = "encryption")]
827    page_encryptor: Option<PageEncryptor>,
828}
829
830impl ArrowPageWriter {
831    /// Create a page writer that buffers completed pages in `store`.
832    fn new(store: Box<dyn PageStore>) -> Self {
833        Self {
834            buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
835            #[cfg(feature = "encryption")]
836            page_encryptor: None,
837        }
838    }
839
840    #[cfg(feature = "encryption")]
841    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
842        self.page_encryptor = page_encryptor;
843        self
844    }
845
846    #[cfg(feature = "encryption")]
847    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
848        self.page_encryptor.as_mut()
849    }
850
851    #[cfg(not(feature = "encryption"))]
852    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
853        None
854    }
855}
856
857impl PageWriter for ArrowPageWriter {
858    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
859        let page = match self.page_encryptor_mut() {
860            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
861            None => page,
862        };
863
864        let page_header = page.to_thrift_header()?;
865        let header = {
866            let mut header = Vec::with_capacity(1024);
867
868            match self.page_encryptor_mut() {
869                Some(page_encryptor) => {
870                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
871                    if page.compressed_page().is_data_page() {
872                        page_encryptor.increment_page();
873                    }
874                }
875                None => {
876                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
877                    page_header.write_thrift(&mut protocol)?;
878                }
879            };
880
881            Bytes::from(header)
882        };
883
884        let mut buf = self.buffer.try_lock().unwrap();
885
886        let data = page.compressed_page().buffer().clone();
887        let compressed_size = data.len() + header.len();
888
889        let mut spec = PageWriteSpec::new();
890        spec.page_type = page.page_type();
891        spec.num_values = page.num_values();
892        spec.uncompressed_size = page.uncompressed_size() + header.len();
893        spec.offset = buf.length as u64;
894        spec.compressed_size = compressed_size;
895        spec.bytes_written = compressed_size as u64;
896
897        buf.length += compressed_size;
898        if spec.page_type == PageType::DICTIONARY_PAGE {
899            // Recorded apart from the data pages so it is emitted first at
900            // splice — see `ArrowColumnChunkData::dictionary_keys`.
901            buf.push_dictionary(header)?;
902            buf.push_dictionary(data)?;
903        } else {
904            buf.push(header)?;
905            buf.push(data)?;
906        }
907
908        Ok(spec)
909    }
910
911    fn defers_dictionary_ordering(&self) -> bool {
912        // The Arrow chunk is buffered in full and spliced at row-group flush, so
913        // data pages may be accepted before the dictionary page and reordered
914        // then. This lets `GenericColumnWriter` stream dictionary-column data
915        // pages straight through instead of buffering them in memory.
916        true
917    }
918
919    fn buffered_memory_size(&self) -> usize {
920        // Only what is actually resident: a spilling store reports ~0 here even
921        // though the chunk's bytes have all passed through it.
922        self.buffer.try_lock().unwrap().memory_size()
923    }
924
925    fn close(&mut self) -> Result<()> {
926        Ok(())
927    }
928}
929
930/// A leaf column that can be encoded by [`ArrowColumnWriter`]
931#[derive(Debug)]
932pub struct ArrowLeafColumn(ArrayLevels);
933
934/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
935///
936/// This function can be used along with [`get_column_writers`] to encode
937/// individual columns in parallel. See example on [`ArrowColumnWriter`]
938pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
939    let levels = calculate_array_levels(array, field)?;
940    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
941}
942
943/// The data for a single column chunk, see [`ArrowColumnWriter`]
944pub struct ArrowColumnChunk {
945    data: ArrowColumnChunkData,
946    close: ColumnCloseResult,
947}
948
949impl std::fmt::Debug for ArrowColumnChunk {
950    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951        f.debug_struct("ArrowColumnChunk")
952            .field("length", &self.data.length)
953            .finish_non_exhaustive()
954    }
955}
956
957impl ArrowColumnChunk {
958    /// Returns the [`ColumnCloseResult`] produced when the chunk was closed.
959    ///
960    /// Exposes encoding information, collected statistics, and the optional
961    /// [`ColumnIndexMetaData`](crate::file::page_index::column_index::ColumnIndexMetaData)
962    /// / [`OffsetIndexMetaData`](crate::file::page_index::offset_index::OffsetIndexMetaData)
963    /// gathered for the column chunk.
964    pub fn close(&self) -> &ColumnCloseResult {
965        &self.close
966    }
967
968    /// Returns a mutable reference to the [`ColumnCloseResult`].
969    ///
970    /// This allows callers to mutate the close result before the chunk is
971    /// appended to a row group — for example, clearing `column_index` or
972    /// `bloom_filter` based on a dynamic rule that inspects the encodings and
973    /// collected page statistics.
974    pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
975        &mut self.close
976    }
977
978    /// Splices this column's buffered pages into the row group, streaming them
979    /// back out of the [`PageStore`] one page at a time.
980    pub fn append_to_row_group<W: Write + Send>(
981        self,
982        writer: &mut SerializedRowGroupWriter<'_, W>,
983    ) -> Result<()> {
984        let ArrowColumnChunk { data, close } = self;
985
986        // The dictionary page is produced *after* the data pages on this path (so
987        // they can stream straight through) but must be written *first*, so move
988        // it ahead of the data pages in the recorded offsets before the splice.
989        let close = close.update_dictionary_location(data.dictionary_len)?;
990
991        let reader = StreamingColumnChunkReader::new(data);
992        writer.append_column_from_read(reader, close)
993    }
994}
995
996/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
997///
998/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
999///
1000/// Note: This is a low-level interface for applications that require
1001/// fine-grained control of encoding (e.g. encoding using multiple threads),
1002/// see [`ArrowWriter`] for a higher-level interface
1003///
1004/// # Example: Encoding two Arrow Array's in Parallel
1005/// ```
1006/// // The arrow schema
1007/// # use std::sync::Arc;
1008/// # use arrow_array::*;
1009/// # use arrow_schema::*;
1010/// # use parquet::arrow::ArrowSchemaConverter;
1011/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
1012/// # use parquet::file::properties::WriterProperties;
1013/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
1014/// #
1015/// let schema = Arc::new(Schema::new(vec![
1016///     Field::new("i32", DataType::Int32, false),
1017///     Field::new("f32", DataType::Float32, false),
1018/// ]));
1019///
1020/// // Compute the parquet schema
1021/// let props = Arc::new(WriterProperties::default());
1022/// let parquet_schema = ArrowSchemaConverter::new()
1023///   .with_coerce_types(props.coerce_types())
1024///   .convert(&schema)
1025///   .unwrap();
1026///
1027/// // Create parquet writer
1028/// let root_schema = parquet_schema.root_schema_ptr();
1029/// // write to memory in the example, but this could be a File
1030/// let mut out = Vec::with_capacity(1024);
1031/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
1032///   .unwrap();
1033///
1034/// // Create a factory for building Arrow column writers
1035/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
1036/// // Create column writers for the 0th row group
1037/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
1038///
1039/// // Spawn a worker thread for each column
1040/// //
1041/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
1042/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
1043/// let mut workers: Vec<_> = col_writers
1044///     .into_iter()
1045///     .map(|mut col_writer| {
1046///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1047///         let handle = std::thread::spawn(move || {
1048///             // receive Arrays to encode via the channel
1049///             for col in recv {
1050///                 col_writer.write(&col)?;
1051///             }
1052///             // once the input is complete, close the writer
1053///             // to return the newly created ArrowColumnChunk
1054///             col_writer.close()
1055///         });
1056///         (handle, send)
1057///     })
1058///     .collect();
1059///
1060/// // Start row group
1061/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
1062///   .next_row_group()
1063///   .unwrap();
1064///
1065/// // Create some example input columns to encode
1066/// let to_write = vec![
1067///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
1068///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
1069/// ];
1070///
1071/// // Send the input columns to the workers
1072/// let mut worker_iter = workers.iter_mut();
1073/// for (arr, field) in to_write.iter().zip(&schema.fields) {
1074///     for leaves in compute_leaves(field, arr).unwrap() {
1075///         worker_iter.next().unwrap().1.send(leaves).unwrap();
1076///     }
1077/// }
1078///
1079/// // Wait for the workers to complete encoding, and append
1080/// // the resulting column chunks to the row group (and the file)
1081/// for (handle, send) in workers {
1082///     drop(send); // Drop send side to signal termination
1083///     // wait for the worker to send the completed chunk
1084///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1085///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
1086/// }
1087/// // Close the row group which writes to the underlying file
1088/// row_group_writer.close().unwrap();
1089///
1090/// let metadata = writer.close().unwrap();
1091/// assert_eq!(metadata.file_metadata().num_rows(), 3);
1092/// ```
1093pub struct ArrowColumnWriter {
1094    writer: ArrowColumnWriterImpl,
1095    chunk: SharedColumnChunk,
1096}
1097
1098impl std::fmt::Debug for ArrowColumnWriter {
1099    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1100        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1101    }
1102}
1103
1104enum ArrowColumnWriterImpl {
1105    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1106    Column(ColumnWriter<'static>),
1107}
1108
1109impl ArrowColumnWriter {
1110    /// Write an [`ArrowLeafColumn`]
1111    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1112        self.write_internal(&col.0)
1113    }
1114
1115    /// Write with content-defined chunking, inserting page flushes at chunk boundaries.
1116    fn write_with_chunker(
1117        &mut self,
1118        col: &ArrowLeafColumn,
1119        chunker: &mut ContentDefinedChunker,
1120    ) -> Result<()> {
1121        let levels = &col.0;
1122        let chunks = chunker.get_arrow_chunks(
1123            levels.def_level_data().as_ref(),
1124            levels.rep_level_data().as_ref(),
1125            levels.array(),
1126        )?;
1127
1128        let num_chunks = chunks.len();
1129        for (i, chunk) in chunks.iter().enumerate() {
1130            let chunk_levels = levels.slice_for_chunk(chunk);
1131            self.write_internal(&chunk_levels)?;
1132
1133            // Add a page break after each chunk except the last
1134            if i + 1 < num_chunks {
1135                match &mut self.writer {
1136                    ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1137                    ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1138                }
1139            }
1140        }
1141        Ok(())
1142    }
1143
1144    fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1145        match &mut self.writer {
1146            ArrowColumnWriterImpl::Column(c) => {
1147                let leaf = levels.array();
1148                match leaf.as_any_dictionary_opt() {
1149                    Some(dictionary) => {
1150                        let materialized =
1151                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1152                        write_leaf(c, &materialized, levels)?
1153                    }
1154                    None => write_leaf(c, leaf, levels)?,
1155                };
1156            }
1157            ArrowColumnWriterImpl::ByteArray(c) => {
1158                write_primitive(c, levels.array().as_ref(), levels)?;
1159            }
1160        }
1161        Ok(())
1162    }
1163
1164    /// Close this column returning the written [`ArrowColumnChunk`]
1165    pub fn close(self) -> Result<ArrowColumnChunk> {
1166        let close = match self.writer {
1167            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1168            ArrowColumnWriterImpl::Column(c) => c.close()?,
1169        };
1170        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1171        let data = chunk.into_inner().unwrap();
1172        Ok(ArrowColumnChunk { data, close })
1173    }
1174
1175    /// Returns the estimated total memory usage by the writer.
1176    ///
1177    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
1178    /// of the current memory usage and not it's anticipated encoded size.
1179    ///
1180    /// This includes:
1181    /// 1. Data buffered in encoded form
1182    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
1183    ///
1184    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
1185    pub fn memory_size(&self) -> usize {
1186        match &self.writer {
1187            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1188            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1189        }
1190    }
1191
1192    /// Returns the estimated total encoded bytes for this column writer.
1193    ///
1194    /// This includes:
1195    /// 1. Data buffered in encoded form
1196    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
1197    ///
1198    /// This value should be less than or equal to [`Self::memory_size`]
1199    pub fn get_estimated_total_bytes(&self) -> usize {
1200        match &self.writer {
1201            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1202            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1203        }
1204    }
1205}
1206
1207/// Encodes [`RecordBatch`] to a parquet row group
1208///
1209/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
1210/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
1211///
1212/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1213#[derive(Debug)]
1214struct ArrowRowGroupWriter {
1215    writers: Vec<ArrowColumnWriter>,
1216    schema: SchemaRef,
1217    buffered_rows: usize,
1218}
1219
1220impl ArrowRowGroupWriter {
1221    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1222        Self {
1223            writers,
1224            schema: arrow.clone(),
1225            buffered_rows: 0,
1226        }
1227    }
1228
1229    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1230        self.buffered_rows += batch.num_rows();
1231        let mut writers = self.writers.iter_mut();
1232        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1233            for leaf in compute_leaves(field.as_ref(), column)? {
1234                writers.next().unwrap().write(&leaf)?;
1235            }
1236        }
1237        Ok(())
1238    }
1239
1240    fn write_with_chunkers(
1241        &mut self,
1242        batch: &RecordBatch,
1243        chunkers: &mut [ContentDefinedChunker],
1244    ) -> Result<()> {
1245        self.buffered_rows += batch.num_rows();
1246        let mut writers = self.writers.iter_mut();
1247        let mut chunkers = chunkers.iter_mut();
1248        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1249            for leaf in compute_leaves(field.as_ref(), column)? {
1250                writers
1251                    .next()
1252                    .unwrap()
1253                    .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1254            }
1255        }
1256        Ok(())
1257    }
1258
1259    /// Returns the estimated total encoded bytes for this row group
1260    fn get_estimated_total_bytes(&self) -> usize {
1261        self.writers
1262            .iter()
1263            .map(|x| x.get_estimated_total_bytes())
1264            .sum()
1265    }
1266
1267    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1268        self.writers
1269            .into_iter()
1270            .map(|writer| writer.close())
1271            .collect()
1272    }
1273}
1274
1275/// Factory that creates new column writers for each row group in the Parquet file.
1276///
1277/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
1278/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1279#[derive(Debug)]
1280pub struct ArrowRowGroupWriterFactory {
1281    schema: SchemaDescPtr,
1282    arrow_schema: SchemaRef,
1283    props: WriterPropertiesPtr,
1284    page_store_factory: Arc<dyn PageStoreFactory>,
1285    #[cfg(feature = "encryption")]
1286    file_encryptor: Option<Arc<FileEncryptor>>,
1287}
1288
1289impl ArrowRowGroupWriterFactory {
1290    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
1291    pub fn new<W: Write + Send>(
1292        file_writer: &SerializedFileWriter<W>,
1293        arrow_schema: SchemaRef,
1294    ) -> Self {
1295        let schema = Arc::clone(file_writer.schema_descr_ptr());
1296        let props = Arc::clone(file_writer.properties());
1297        Self {
1298            schema,
1299            arrow_schema,
1300            props,
1301            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1302            #[cfg(feature = "encryption")]
1303            file_encryptor: file_writer.file_encryptor(),
1304        }
1305    }
1306
1307    /// Set the [`PageStoreFactory`] used to allocate the buffer for each column
1308    /// chunk, e.g. to spill completed pages to a temp file or object storage
1309    /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`].
1310    pub fn with_page_store_factory(
1311        mut self,
1312        page_store_factory: Arc<dyn PageStoreFactory>,
1313    ) -> Self {
1314        self.page_store_factory = page_store_factory;
1315        self
1316    }
1317
1318    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1319        let writers = self.create_column_writers(row_group_index)?;
1320        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1321    }
1322
1323    /// Create column writers for a new row group, with the given row group index
1324    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1325        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1326        let mut leaves = self.schema.columns().iter();
1327        let column_factory = self.column_writer_factory(row_group_index);
1328        for field in &self.arrow_schema.fields {
1329            column_factory.get_arrow_column_writer(
1330                field.data_type(),
1331                &self.props,
1332                &mut leaves,
1333                &mut writers,
1334            )?;
1335        }
1336        Ok(writers)
1337    }
1338
1339    #[cfg(feature = "encryption")]
1340    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1341        ArrowColumnWriterFactory::new()
1342            .with_page_store_factory(self.page_store_factory.clone())
1343            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1344    }
1345
1346    #[cfg(not(feature = "encryption"))]
1347    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1348        ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1349    }
1350}
1351
1352/// Returns [`ArrowColumnWriter`]s for each column in a given schema
1353#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1354pub fn get_column_writers(
1355    parquet: &SchemaDescriptor,
1356    props: &WriterPropertiesPtr,
1357    arrow: &SchemaRef,
1358) -> Result<Vec<ArrowColumnWriter>> {
1359    let mut writers = Vec::with_capacity(arrow.fields.len());
1360    let mut leaves = parquet.columns().iter();
1361    let column_factory = ArrowColumnWriterFactory::new();
1362    for field in &arrow.fields {
1363        column_factory.get_arrow_column_writer(
1364            field.data_type(),
1365            props,
1366            &mut leaves,
1367            &mut writers,
1368        )?;
1369    }
1370    Ok(writers)
1371}
1372
1373/// Creates [`ArrowColumnWriter`] instances
1374struct ArrowColumnWriterFactory {
1375    /// Allocates the per-column-chunk [`PageStore`] backing each page writer.
1376    page_store_factory: Arc<dyn PageStoreFactory>,
1377    #[cfg(feature = "encryption")]
1378    row_group_index: usize,
1379    #[cfg(feature = "encryption")]
1380    file_encryptor: Option<Arc<FileEncryptor>>,
1381}
1382
1383impl ArrowColumnWriterFactory {
1384    pub fn new() -> Self {
1385        Self {
1386            page_store_factory: Arc::new(InMemoryPageStoreFactory),
1387            #[cfg(feature = "encryption")]
1388            row_group_index: 0,
1389            #[cfg(feature = "encryption")]
1390            file_encryptor: None,
1391        }
1392    }
1393
1394    /// Use `page_store_factory` to allocate the buffer for each column chunk.
1395    pub fn with_page_store_factory(
1396        mut self,
1397        page_store_factory: Arc<dyn PageStoreFactory>,
1398    ) -> Self {
1399        self.page_store_factory = page_store_factory;
1400        self
1401    }
1402
1403    #[cfg(feature = "encryption")]
1404    pub fn with_file_encryptor(
1405        mut self,
1406        row_group_index: usize,
1407        file_encryptor: Option<Arc<FileEncryptor>>,
1408    ) -> Self {
1409        self.row_group_index = row_group_index;
1410        self.file_encryptor = file_encryptor;
1411        self
1412    }
1413
1414    #[cfg(feature = "encryption")]
1415    fn create_page_writer(
1416        &self,
1417        column_descriptor: &ColumnDescPtr,
1418        column_index: usize,
1419    ) -> Result<Box<ArrowPageWriter>> {
1420        let column_path = column_descriptor.path().string();
1421        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1422            &self.file_encryptor,
1423            self.row_group_index,
1424            column_index,
1425            &column_path,
1426        )?;
1427        let args = PageStoreArgs::new(column_index, column_descriptor);
1428        let store = self.page_store_factory.create(&args)?;
1429        Ok(Box::new(
1430            ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1431        ))
1432    }
1433
1434    #[cfg(not(feature = "encryption"))]
1435    fn create_page_writer(
1436        &self,
1437        column_descriptor: &ColumnDescPtr,
1438        column_index: usize,
1439    ) -> Result<Box<ArrowPageWriter>> {
1440        let args = PageStoreArgs::new(column_index, column_descriptor);
1441        let store = self.page_store_factory.create(&args)?;
1442        Ok(Box::new(ArrowPageWriter::new(store)))
1443    }
1444
1445    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1446    /// output ColumnDesc to `leaves` and the column writers to `out`
1447    fn get_arrow_column_writer(
1448        &self,
1449        data_type: &ArrowDataType,
1450        props: &WriterPropertiesPtr,
1451        leaves: &mut Iter<'_, ColumnDescPtr>,
1452        out: &mut Vec<ArrowColumnWriter>,
1453    ) -> Result<()> {
1454        // Instantiate writers for normal columns
1455        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1456            let page_writer = self.create_page_writer(desc, out.len())?;
1457            let chunk = page_writer.buffer.clone();
1458            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1459            Ok(ArrowColumnWriter {
1460                chunk,
1461                writer: ArrowColumnWriterImpl::Column(writer),
1462            })
1463        };
1464
1465        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1466        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1467            let page_writer = self.create_page_writer(desc, out.len())?;
1468            let chunk = page_writer.buffer.clone();
1469            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1470            Ok(ArrowColumnWriter {
1471                chunk,
1472                writer: ArrowColumnWriterImpl::ByteArray(writer),
1473            })
1474        };
1475
1476        match data_type {
1477            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1478            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1479                out.push(col(leaves.next().unwrap())?)
1480            }
1481            ArrowDataType::LargeBinary
1482            | ArrowDataType::Binary
1483            | ArrowDataType::Utf8
1484            | ArrowDataType::LargeUtf8
1485            | ArrowDataType::BinaryView
1486            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1487            ArrowDataType::List(f)
1488            | ArrowDataType::LargeList(f)
1489            | ArrowDataType::FixedSizeList(f, _)
1490            | ArrowDataType::ListView(f)
1491            | ArrowDataType::LargeListView(f) => {
1492                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1493            }
1494            ArrowDataType::Struct(fields) => {
1495                for field in fields {
1496                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1497                }
1498            }
1499            ArrowDataType::Map(f, _) => match f.data_type() {
1500                ArrowDataType::Struct(f) => {
1501                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1502                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1503                }
1504                _ => unreachable!("invalid map type"),
1505            },
1506            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1507                ArrowDataType::Utf8
1508                | ArrowDataType::LargeUtf8
1509                | ArrowDataType::Binary
1510                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1511                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1512                    out.push(bytes(leaves.next().unwrap())?)
1513                }
1514                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1515                _ => out.push(col(leaves.next().unwrap())?),
1516            },
1517            ArrowDataType::RunEndEncoded(_, value_field) => {
1518                self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1519            }
1520            _ => {
1521                return Err(ParquetError::NYI(format!(
1522                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1523                )));
1524            }
1525        }
1526        Ok(())
1527    }
1528}
1529
1530fn write_leaf(
1531    writer: &mut ColumnWriter<'_>,
1532    column: &dyn arrow_array::Array,
1533    levels: &ArrayLevels,
1534) -> Result<usize> {
1535    let indices = levels.non_null_indices();
1536
1537    match writer {
1538        // Note: this should match the contents of arrow_to_parquet_type
1539        ColumnWriter::Int32ColumnWriter(typed) => {
1540            match column.data_type() {
1541                ArrowDataType::Null => {
1542                    let array = Int32Array::new_null(column.len());
1543                    write_primitive(typed, array.values(), levels)
1544                }
1545                ArrowDataType::Int8 => {
1546                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1547                    write_primitive(typed, array.values(), levels)
1548                }
1549                ArrowDataType::Int16 => {
1550                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1551                    write_primitive(typed, array.values(), levels)
1552                }
1553                ArrowDataType::Int32 => {
1554                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1555                }
1556                ArrowDataType::UInt8 => {
1557                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1558                    write_primitive(typed, array.values(), levels)
1559                }
1560                ArrowDataType::UInt16 => {
1561                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1562                    write_primitive(typed, array.values(), levels)
1563                }
1564                ArrowDataType::UInt32 => {
1565                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1566                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1567                    let array = column.as_primitive::<UInt32Type>();
1568                    write_primitive(typed, array.values().inner().typed_data(), levels)
1569                }
1570                ArrowDataType::Date32 => {
1571                    let array = column.as_primitive::<Date32Type>();
1572                    write_primitive(typed, array.values(), levels)
1573                }
1574                ArrowDataType::Time32(TimeUnit::Second) => {
1575                    let array = column.as_primitive::<Time32SecondType>();
1576                    write_primitive(typed, array.values(), levels)
1577                }
1578                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1579                    let array = column.as_primitive::<Time32MillisecondType>();
1580                    write_primitive(typed, array.values(), levels)
1581                }
1582                ArrowDataType::Date64 => {
1583                    // If the column is a Date64, we truncate it
1584                    let array: Int32Array = column
1585                        .as_primitive::<Date64Type>()
1586                        .unary(|x| (x / 86_400_000) as _);
1587
1588                    write_primitive(typed, array.values(), levels)
1589                }
1590                ArrowDataType::Decimal32(_, _) => {
1591                    let array = column
1592                        .as_primitive::<Decimal32Type>()
1593                        .unary::<_, Int32Type>(|v| v);
1594                    write_primitive(typed, array.values(), levels)
1595                }
1596                ArrowDataType::Decimal64(_, _) => {
1597                    // use the int32 to represent the decimal with low precision
1598                    let array = column
1599                        .as_primitive::<Decimal64Type>()
1600                        .unary::<_, Int32Type>(|v| v as i32);
1601                    write_primitive(typed, array.values(), levels)
1602                }
1603                ArrowDataType::Decimal128(_, _) => {
1604                    // use the int32 to represent the decimal with low precision
1605                    let array = column
1606                        .as_primitive::<Decimal128Type>()
1607                        .unary::<_, Int32Type>(|v| v as i32);
1608                    write_primitive(typed, array.values(), levels)
1609                }
1610                ArrowDataType::Decimal256(_, _) => {
1611                    // use the int32 to represent the decimal with low precision
1612                    let array = column
1613                        .as_primitive::<Decimal256Type>()
1614                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1615                    write_primitive(typed, array.values(), levels)
1616                }
1617                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1618            }
1619        }
1620        ColumnWriter::BoolColumnWriter(typed) => {
1621            let array = column.as_boolean();
1622            let values = get_bool_array_slice(array, indices.iter().copied());
1623            typed.write_batch_internal(
1624                values.as_slice(),
1625                None,
1626                levels.def_level_data().as_ref(),
1627                levels.rep_level_data().as_ref(),
1628                None,
1629                None,
1630                None,
1631            )
1632        }
1633        ColumnWriter::Int64ColumnWriter(typed) => {
1634            match column.data_type() {
1635                ArrowDataType::Date64 => {
1636                    let array = column
1637                        .as_primitive::<Date64Type>()
1638                        .reinterpret_cast::<Int64Type>();
1639
1640                    write_primitive(typed, array.values(), levels)
1641                }
1642                ArrowDataType::Int64 => {
1643                    let array = column.as_primitive::<Int64Type>();
1644                    write_primitive(typed, array.values(), levels)
1645                }
1646                ArrowDataType::UInt64 => {
1647                    let values = column.as_primitive::<UInt64Type>().values();
1648                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1649                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1650                    let array = values.inner().typed_data::<i64>();
1651                    write_primitive(typed, array, levels)
1652                }
1653                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1654                    let array = column.as_primitive::<Time64MicrosecondType>();
1655                    write_primitive(typed, array.values(), levels)
1656                }
1657                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1658                    let array = column.as_primitive::<Time64NanosecondType>();
1659                    write_primitive(typed, array.values(), levels)
1660                }
1661                ArrowDataType::Timestamp(unit, _) => match unit {
1662                    TimeUnit::Second => {
1663                        let array = column.as_primitive::<TimestampSecondType>();
1664                        write_primitive(typed, array.values(), levels)
1665                    }
1666                    TimeUnit::Millisecond => {
1667                        let array = column.as_primitive::<TimestampMillisecondType>();
1668                        write_primitive(typed, array.values(), levels)
1669                    }
1670                    TimeUnit::Microsecond => {
1671                        let array = column.as_primitive::<TimestampMicrosecondType>();
1672                        write_primitive(typed, array.values(), levels)
1673                    }
1674                    TimeUnit::Nanosecond => {
1675                        let array = column.as_primitive::<TimestampNanosecondType>();
1676                        write_primitive(typed, array.values(), levels)
1677                    }
1678                },
1679                ArrowDataType::Duration(unit) => match unit {
1680                    TimeUnit::Second => {
1681                        let array = column.as_primitive::<DurationSecondType>();
1682                        write_primitive(typed, array.values(), levels)
1683                    }
1684                    TimeUnit::Millisecond => {
1685                        let array = column.as_primitive::<DurationMillisecondType>();
1686                        write_primitive(typed, array.values(), levels)
1687                    }
1688                    TimeUnit::Microsecond => {
1689                        let array = column.as_primitive::<DurationMicrosecondType>();
1690                        write_primitive(typed, array.values(), levels)
1691                    }
1692                    TimeUnit::Nanosecond => {
1693                        let array = column.as_primitive::<DurationNanosecondType>();
1694                        write_primitive(typed, array.values(), levels)
1695                    }
1696                },
1697                ArrowDataType::Decimal64(_, _) => {
1698                    let array = column
1699                        .as_primitive::<Decimal64Type>()
1700                        .reinterpret_cast::<Int64Type>();
1701                    write_primitive(typed, array.values(), levels)
1702                }
1703                ArrowDataType::Decimal128(_, _) => {
1704                    // use the int64 to represent the decimal with low precision
1705                    let array = column
1706                        .as_primitive::<Decimal128Type>()
1707                        .unary::<_, Int64Type>(|v| v as i64);
1708                    write_primitive(typed, array.values(), levels)
1709                }
1710                ArrowDataType::Decimal256(_, _) => {
1711                    // use the int64 to represent the decimal with low precision
1712                    let array = column
1713                        .as_primitive::<Decimal256Type>()
1714                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1715                    write_primitive(typed, array.values(), levels)
1716                }
1717                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1718            }
1719        }
1720        ColumnWriter::Int96ColumnWriter(_typed) => {
1721            unreachable!("Currently unreachable because data type not supported")
1722        }
1723        ColumnWriter::FloatColumnWriter(typed) => {
1724            let array = column.as_primitive::<Float32Type>();
1725            write_primitive(typed, array.values(), levels)
1726        }
1727        ColumnWriter::DoubleColumnWriter(typed) => {
1728            let array = column.as_primitive::<Float64Type>();
1729            write_primitive(typed, array.values(), levels)
1730        }
1731        ColumnWriter::ByteArrayColumnWriter(_) => {
1732            unreachable!("should use ByteArrayWriter")
1733        }
1734        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1735            let bytes = match column.data_type() {
1736                ArrowDataType::Interval(interval_unit) => match interval_unit {
1737                    IntervalUnit::YearMonth => {
1738                        let array = column.as_primitive::<IntervalYearMonthType>();
1739                        get_interval_ym_array_slice(array, indices.iter().copied())
1740                    }
1741                    IntervalUnit::DayTime => {
1742                        let array = column.as_primitive::<IntervalDayTimeType>();
1743                        get_interval_dt_array_slice(array, indices.iter().copied())
1744                    }
1745                    _ => {
1746                        return Err(ParquetError::NYI(format!(
1747                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1748                        )));
1749                    }
1750                },
1751                ArrowDataType::FixedSizeBinary(_) => {
1752                    let array = column.as_fixed_size_binary();
1753                    get_fsb_array_slice(array, indices.iter().copied())
1754                }
1755                ArrowDataType::Decimal32(_, _) => {
1756                    let array = column.as_primitive::<Decimal32Type>();
1757                    get_decimal_32_array_slice(array, indices.iter().copied())
1758                }
1759                ArrowDataType::Decimal64(_, _) => {
1760                    let array = column.as_primitive::<Decimal64Type>();
1761                    get_decimal_64_array_slice(array, indices.iter().copied())
1762                }
1763                ArrowDataType::Decimal128(_, _) => {
1764                    let array = column.as_primitive::<Decimal128Type>();
1765                    get_decimal_128_array_slice(array, indices.iter().copied())
1766                }
1767                ArrowDataType::Decimal256(_, _) => {
1768                    let array = column.as_primitive::<Decimal256Type>();
1769                    get_decimal_256_array_slice(array, indices.iter().copied())
1770                }
1771                ArrowDataType::Float16 => {
1772                    let array = column.as_primitive::<Float16Type>();
1773                    get_float_16_array_slice(array, indices.iter().copied())
1774                }
1775                _ => {
1776                    return Err(ParquetError::NYI(
1777                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1778                    ));
1779                }
1780            };
1781            typed.write_batch_internal(
1782                bytes.as_slice(),
1783                None,
1784                levels.def_level_data().as_ref(),
1785                levels.rep_level_data().as_ref(),
1786                None,
1787                None,
1788                None,
1789            )
1790        }
1791    }
1792}
1793
1794fn write_primitive<E: ColumnValueEncoder>(
1795    writer: &mut GenericColumnWriter<E>,
1796    values: &E::Values,
1797    levels: &ArrayLevels,
1798) -> Result<usize> {
1799    writer.write_batch_internal(
1800        values,
1801        Some(levels.non_null_indices()),
1802        levels.def_level_data().as_ref(),
1803        levels.rep_level_data().as_ref(),
1804        None,
1805        None,
1806        None,
1807    )
1808}
1809
1810fn get_bool_array_slice(
1811    array: &arrow_array::BooleanArray,
1812    indices: impl ExactSizeIterator<Item = usize>,
1813) -> Vec<bool> {
1814    let mut values = Vec::with_capacity(indices.len());
1815    for i in indices {
1816        values.push(array.value(i))
1817    }
1818    values
1819}
1820
1821/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1822/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1823fn get_interval_ym_array_slice(
1824    array: &arrow_array::IntervalYearMonthArray,
1825    indices: impl ExactSizeIterator<Item = usize>,
1826) -> Vec<FixedLenByteArray> {
1827    let mut values = Vec::with_capacity(indices.len());
1828    for i in indices {
1829        let mut value = array.value(i).to_le_bytes().to_vec();
1830        let mut suffix = vec![0; 8];
1831        value.append(&mut suffix);
1832        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1833    }
1834    values
1835}
1836
1837/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1838/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1839fn get_interval_dt_array_slice(
1840    array: &arrow_array::IntervalDayTimeArray,
1841    indices: impl ExactSizeIterator<Item = usize>,
1842) -> Vec<FixedLenByteArray> {
1843    let mut values = Vec::with_capacity(indices.len());
1844    for i in indices {
1845        let mut out = [0; 12];
1846        let value = array.value(i);
1847        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1848        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1849        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1850    }
1851    values
1852}
1853
1854fn get_decimal_32_array_slice(
1855    array: &arrow_array::Decimal32Array,
1856    indices: impl ExactSizeIterator<Item = usize>,
1857) -> Vec<FixedLenByteArray> {
1858    let mut values = Vec::with_capacity(indices.len());
1859    let size = decimal_length_from_precision(array.precision());
1860    for i in indices {
1861        let as_be_bytes = array.value(i).to_be_bytes();
1862        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1863        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1864    }
1865    values
1866}
1867
1868fn get_decimal_64_array_slice(
1869    array: &arrow_array::Decimal64Array,
1870    indices: impl ExactSizeIterator<Item = usize>,
1871) -> Vec<FixedLenByteArray> {
1872    let mut values = Vec::with_capacity(indices.len());
1873    let size = decimal_length_from_precision(array.precision());
1874    for i in indices {
1875        let as_be_bytes = array.value(i).to_be_bytes();
1876        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1877        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1878    }
1879    values
1880}
1881
1882fn get_decimal_128_array_slice(
1883    array: &arrow_array::Decimal128Array,
1884    indices: impl ExactSizeIterator<Item = usize>,
1885) -> Vec<FixedLenByteArray> {
1886    let mut values = Vec::with_capacity(indices.len());
1887    let size = decimal_length_from_precision(array.precision());
1888    for i in indices {
1889        let as_be_bytes = array.value(i).to_be_bytes();
1890        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1891        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1892    }
1893    values
1894}
1895
1896fn get_decimal_256_array_slice(
1897    array: &arrow_array::Decimal256Array,
1898    indices: impl ExactSizeIterator<Item = usize>,
1899) -> Vec<FixedLenByteArray> {
1900    let mut values = Vec::with_capacity(indices.len());
1901    let size = decimal_length_from_precision(array.precision());
1902    for i in indices {
1903        let as_be_bytes = array.value(i).to_be_bytes();
1904        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1905        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1906    }
1907    values
1908}
1909
1910fn get_float_16_array_slice(
1911    array: &arrow_array::Float16Array,
1912    indices: impl ExactSizeIterator<Item = usize>,
1913) -> Vec<FixedLenByteArray> {
1914    let mut values = Vec::with_capacity(indices.len());
1915    for i in indices {
1916        let value = array.value(i).to_le_bytes().to_vec();
1917        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1918    }
1919    values
1920}
1921
1922fn get_fsb_array_slice(
1923    array: &arrow_array::FixedSizeBinaryArray,
1924    indices: impl ExactSizeIterator<Item = usize>,
1925) -> Vec<FixedLenByteArray> {
1926    let mut values = Vec::with_capacity(indices.len());
1927    for i in indices {
1928        let value = array.value(i).to_vec();
1929        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1930    }
1931    values
1932}
1933
1934#[cfg(test)]
1935mod tests {
1936    use super::*;
1937    use std::collections::HashMap;
1938
1939    use std::fs::File;
1940
1941    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1942    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1943    use crate::column::page::{Page, PageReader};
1944    use crate::file::metadata::thrift::PageHeader;
1945    use crate::file::page_index::column_index::ColumnIndexMetaData;
1946    use crate::file::reader::SerializedPageReader;
1947    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1948    use crate::schema::types::ColumnPath;
1949    use arrow::datatypes::ToByteSlice;
1950    use arrow::datatypes::{DataType, Schema};
1951    use arrow::error::Result as ArrowResult;
1952    use arrow::util::data_gen::create_random_array;
1953    use arrow::util::pretty::pretty_format_batches;
1954    use arrow::{array::*, buffer::Buffer};
1955    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1956    use arrow_schema::Fields;
1957    use half::f16;
1958    use num_traits::{FromPrimitive, ToPrimitive};
1959    use tempfile::tempfile;
1960
1961    use crate::basic::Encoding;
1962    use crate::data_type::AsBytes;
1963    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1964    use crate::file::properties::{
1965        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1966    };
1967    use crate::file::serialized_reader::ReadOptionsBuilder;
1968    use crate::file::{
1969        reader::{FileReader, SerializedFileReader},
1970        statistics::Statistics,
1971    };
1972
1973    /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and keeps
1974    /// blobs in a `HashMap` — nothing like the default `Vec<Bytes>`. Used to
1975    /// prove the writer relies only on the opaque-handle contract and never on
1976    /// handles being dense `Vec` indices. Records how many blobs were stored.
1977    #[derive(Debug, Default)]
1978    struct RecordingPageStore {
1979        next: u64,
1980        blobs: HashMap<u64, Bytes>,
1981        puts: Arc<std::sync::atomic::AtomicUsize>,
1982    }
1983
1984    impl PageStore for RecordingPageStore {
1985        fn put(&mut self, value: Bytes) -> Result<PageKey> {
1986            // Deliberately non-sequential, never-zero handles.
1987            let id = 100 + self.next * 7;
1988            self.next += 1;
1989            self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1990            self.blobs.insert(id, value);
1991            Ok(PageKey::new(id))
1992        }
1993
1994        fn take(&mut self, key: PageKey) -> Result<Bytes> {
1995            self.blobs
1996                .remove(&key.get())
1997                .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
1998        }
1999    }
2000
2001    #[derive(Debug)]
2002    struct RecordingPageStoreFactory {
2003        puts: Arc<std::sync::atomic::AtomicUsize>,
2004    }
2005
2006    impl PageStoreFactory for RecordingPageStoreFactory {
2007        fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2008            Ok(Box::new(RecordingPageStore {
2009                puts: self.puts.clone(),
2010                ..Default::default()
2011            }))
2012        }
2013    }
2014
2015    /// A custom [`PageStore`] must produce byte-identical files to the in-memory
2016    /// default, across dictionary and non-dictionary columns and multiple row
2017    /// groups (so multiple store instances are exercised).
2018    #[test]
2019    fn custom_page_store_is_byte_identical_to_default() {
2020        let schema = Arc::new(Schema::new(vec![
2021            Field::new("i", DataType::Int32, true),
2022            // A low-cardinality string column to exercise the dictionary path.
2023            Field::new("s", DataType::Utf8, true),
2024        ]));
2025        let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2026        let s = StringArray::from(vec![
2027            Some("a"),
2028            Some("bb"),
2029            Some("a"),
2030            None,
2031            Some("bb"),
2032            Some("ccc"),
2033        ]);
2034        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2035
2036        // Small row groups so multiple column chunks (hence multiple store
2037        // instances) are produced.
2038        let props = WriterProperties::builder()
2039            .set_max_row_group_row_count(Some(3))
2040            .build();
2041
2042        let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2043            let mut buffer = Vec::new();
2044            let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2045            if let Some(factory) = factory {
2046                opts = opts.with_page_store_factory(factory);
2047            }
2048            let mut writer =
2049                ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2050            writer.write(&batch).unwrap();
2051            writer.close().unwrap();
2052            buffer
2053        };
2054
2055        let default_bytes = write(None);
2056
2057        let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2058        let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2059            puts: puts.clone(),
2060        })));
2061
2062        assert!(
2063            puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2064            "custom PageStore was never written to"
2065        );
2066        assert_eq!(
2067            default_bytes, custom_bytes,
2068            "a custom PageStore must produce byte-identical output to the default"
2069        );
2070    }
2071
2072    /// A dictionary-encoded column written through the deferred-ordering Arrow
2073    /// path must round-trip correctly even with the offset index disabled, when
2074    /// only the chunk-level dictionary/data page offsets are rewritten (there is
2075    /// no offset index to rebuild). Spans multiple data pages so the
2076    /// dictionary-first reordering is exercised.
2077    #[test]
2078    fn dictionary_column_round_trips_with_offset_index_disabled() {
2079        let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2080
2081        // Low cardinality so the column stays dictionary-encoded; enough rows to
2082        // span several data pages within a single row group.
2083        let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2084        let array = Int32Array::from(values.clone());
2085        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2086
2087        let props = WriterProperties::builder()
2088            .set_offset_index_disabled(true)
2089            .set_data_page_row_count_limit(4096)
2090            .build();
2091        let opts = ArrowWriterOptions::new().with_properties(props);
2092
2093        let mut buffer = Vec::new();
2094        let mut writer =
2095            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2096        writer.write(&batch).unwrap();
2097        writer.close().unwrap();
2098
2099        let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2100        let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2101        let read_values: Vec<Option<i32>> = read
2102            .iter()
2103            .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2104            .collect();
2105        assert_eq!(read_values, values);
2106    }
2107
2108    /// The dictionary page is routed through the [`PageStore`] like any other
2109    /// page rather than held resident in memory, so a dictionary column chunk's
2110    /// *entire* serialized size — dictionary page included — passes through the
2111    /// store.
2112    #[test]
2113    fn dictionary_page_is_routed_through_the_store() {
2114        /// A store that sums the bytes handed to `put`.
2115        #[derive(Debug, Default)]
2116        struct SizeRecordingPageStore {
2117            blobs: Vec<Bytes>,
2118            bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2119        }
2120        impl PageStore for SizeRecordingPageStore {
2121            fn put(&mut self, value: Bytes) -> Result<PageKey> {
2122                self.bytes_put
2123                    .fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed);
2124                let key = PageKey::new(self.blobs.len() as u64);
2125                self.blobs.push(value);
2126                Ok(key)
2127            }
2128            fn take(&mut self, key: PageKey) -> Result<Bytes> {
2129                Ok(std::mem::take(&mut self.blobs[key.get() as usize]))
2130            }
2131        }
2132        #[derive(Debug)]
2133        struct Factory {
2134            bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2135        }
2136        impl PageStoreFactory for Factory {
2137            fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2138                Ok(Box::new(SizeRecordingPageStore {
2139                    bytes_put: self.bytes_put.clone(),
2140                    ..Default::default()
2141                }))
2142            }
2143        }
2144
2145        let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
2146        // Low cardinality keeps the column dictionary-encoded with a real,
2147        // non-empty dictionary page.
2148        let values: Vec<&str> = (0..2048)
2149            .map(|i| ["alpha", "beta", "gamma", "delta"][i % 4])
2150            .collect();
2151        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])
2152            .unwrap();
2153
2154        let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2155        let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory {
2156            bytes_put: bytes_put.clone(),
2157        }));
2158
2159        // A single batch / single column means exactly one row group and one
2160        // store instance, so the bytes it saw map to one column chunk.
2161        let mut buffer = Vec::new();
2162        let mut writer =
2163            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2164        writer.write(&batch).unwrap();
2165        writer.close().unwrap();
2166
2167        let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
2168        let column = reader.metadata().row_group(0).column(0);
2169        assert!(
2170            column.dictionary_page_offset().is_some(),
2171            "expected the column to be dictionary-encoded"
2172        );
2173
2174        // The bytes the store was handed must account for the whole chunk,
2175        // dictionary page included. Holding the dictionary page apart from the
2176        // store would make this fall short by the dictionary page's size.
2177        assert_eq!(
2178            bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64,
2179            column.compressed_size(),
2180            "the dictionary page must pass through the store like any other page"
2181        );
2182    }
2183
2184    #[test]
2185    fn arrow_writer() {
2186        // define schema
2187        let schema = Schema::new(vec![
2188            Field::new("a", DataType::Int32, false),
2189            Field::new("b", DataType::Int32, true),
2190        ]);
2191
2192        // create some data
2193        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2194        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2195
2196        // build a record batch
2197        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2198
2199        roundtrip(batch, Some(SMALL_SIZE / 2));
2200    }
2201
2202    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2203        let mut buffer = vec![];
2204
2205        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2206        writer.write(expected_batch).unwrap();
2207        writer.close().unwrap();
2208
2209        buffer
2210    }
2211
2212    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2213        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2214        writer.write(expected_batch).unwrap();
2215        writer.into_inner().unwrap()
2216    }
2217
2218    #[test]
2219    fn roundtrip_bytes() {
2220        // define schema
2221        let schema = Arc::new(Schema::new(vec![
2222            Field::new("a", DataType::Int32, false),
2223            Field::new("b", DataType::Int32, true),
2224        ]));
2225
2226        // create some data
2227        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2228        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2229
2230        // build a record batch
2231        let expected_batch =
2232            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2233
2234        for buffer in [
2235            get_bytes_after_close(schema.clone(), &expected_batch),
2236            get_bytes_by_into_inner(schema, &expected_batch),
2237        ] {
2238            let cursor = Bytes::from(buffer);
2239            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2240
2241            let actual_batch = record_batch_reader
2242                .next()
2243                .expect("No batch found")
2244                .expect("Unable to get batch");
2245
2246            assert_eq!(expected_batch.schema(), actual_batch.schema());
2247            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2248            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2249            for i in 0..expected_batch.num_columns() {
2250                let expected_data = expected_batch.column(i).to_data();
2251                let actual_data = actual_batch.column(i).to_data();
2252
2253                assert_eq!(expected_data, actual_data);
2254            }
2255        }
2256    }
2257
2258    #[test]
2259    fn arrow_writer_non_null() {
2260        // define schema
2261        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2262
2263        // create some data
2264        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2265
2266        // build a record batch
2267        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2268
2269        roundtrip(batch, Some(SMALL_SIZE / 2));
2270    }
2271
2272    #[test]
2273    fn arrow_writer_list() {
2274        // define schema
2275        let schema = Schema::new(vec![Field::new(
2276            "a",
2277            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2278            true,
2279        )]);
2280
2281        // create some data
2282        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2283
2284        // Construct a buffer for value offsets, for the nested array:
2285        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2286        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2287
2288        // Construct a list array from the above two
2289        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2290            DataType::Int32,
2291            false,
2292        ))))
2293        .len(5)
2294        .add_buffer(a_value_offsets)
2295        .add_child_data(a_values.into_data())
2296        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2297        .build()
2298        .unwrap();
2299        let a = ListArray::from(a_list_data);
2300
2301        // build a record batch
2302        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2303
2304        assert_eq!(batch.column(0).null_count(), 1);
2305
2306        // This test fails if the max row group size is less than the batch's length
2307        // see https://github.com/apache/arrow-rs/issues/518
2308        roundtrip(batch, None);
2309    }
2310
2311    #[test]
2312    fn arrow_writer_list_non_null() {
2313        // define schema
2314        let schema = Schema::new(vec![Field::new(
2315            "a",
2316            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2317            false,
2318        )]);
2319
2320        // create some data
2321        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2322
2323        // Construct a buffer for value offsets, for the nested array:
2324        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2325        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2326
2327        // Construct a list array from the above two
2328        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2329            DataType::Int32,
2330            false,
2331        ))))
2332        .len(5)
2333        .add_buffer(a_value_offsets)
2334        .add_child_data(a_values.into_data())
2335        .build()
2336        .unwrap();
2337        let a = ListArray::from(a_list_data);
2338
2339        // build a record batch
2340        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2341
2342        // This test fails if the max row group size is less than the batch's length
2343        // see https://github.com/apache/arrow-rs/issues/518
2344        assert_eq!(batch.column(0).null_count(), 0);
2345
2346        roundtrip(batch, None);
2347    }
2348
2349    #[test]
2350    fn arrow_writer_list_view() {
2351        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2352        let schema = Schema::new(vec![Field::new(
2353            "a",
2354            DataType::ListView(list_field.clone()),
2355            true,
2356        )]);
2357
2358        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2359        let a = ListViewArray::new(
2360            list_field,
2361            vec![0, 1, 0, 3, 6].into(),
2362            vec![1, 2, 0, 3, 4].into(),
2363            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2364            Some(vec![true, true, false, true, true].into()),
2365        );
2366
2367        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2368
2369        assert_eq!(batch.column(0).null_count(), 1);
2370
2371        roundtrip(batch, None);
2372    }
2373
2374    #[test]
2375    fn arrow_writer_list_view_non_null() {
2376        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2377        let schema = Schema::new(vec![Field::new(
2378            "a",
2379            DataType::ListView(list_field.clone()),
2380            false,
2381        )]);
2382
2383        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2384        let a = ListViewArray::new(
2385            list_field,
2386            vec![0, 1, 0, 3, 6].into(),
2387            vec![1, 2, 0, 3, 4].into(),
2388            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2389            None,
2390        );
2391
2392        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2393
2394        assert_eq!(batch.column(0).null_count(), 0);
2395
2396        roundtrip(batch, None);
2397    }
2398
2399    #[test]
2400    fn arrow_writer_list_view_out_of_order() {
2401        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2402        let schema = Schema::new(vec![Field::new(
2403            "a",
2404            DataType::ListView(list_field.clone()),
2405            false,
2406        )]);
2407
2408        // [[1], [2, 3], [], [7, 8, 9, 10], [4, 5, 6]] - out of order offsets
2409        let a = ListViewArray::new(
2410            list_field,
2411            vec![0, 1, 0, 6, 3].into(),
2412            vec![1, 2, 0, 4, 3].into(),
2413            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2414            None,
2415        );
2416
2417        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2418
2419        roundtrip(batch, None);
2420    }
2421
2422    #[test]
2423    fn arrow_writer_large_list_view() {
2424        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2425        let schema = Schema::new(vec![Field::new(
2426            "a",
2427            DataType::LargeListView(list_field.clone()),
2428            true,
2429        )]);
2430
2431        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
2432        let a = LargeListViewArray::new(
2433            list_field,
2434            vec![0i64, 1, 0, 3, 6].into(),
2435            vec![1i64, 2, 0, 3, 4].into(),
2436            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2437            Some(vec![true, true, false, true, true].into()),
2438        );
2439
2440        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2441
2442        assert_eq!(batch.column(0).null_count(), 1);
2443
2444        roundtrip(batch, None);
2445    }
2446
2447    #[test]
2448    fn arrow_writer_list_view_with_struct() {
2449        // Test ListView containing Struct: ListView<Struct<Int32, Utf8>>
2450        let struct_fields = Fields::from(vec![
2451            Field::new("id", DataType::Int32, false),
2452            Field::new("name", DataType::Utf8, false),
2453        ]);
2454        let struct_type = DataType::Struct(struct_fields.clone());
2455        let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2456
2457        let schema = Schema::new(vec![Field::new(
2458            "a",
2459            DataType::ListView(list_field.clone()),
2460            true,
2461        )]);
2462
2463        // Create struct values
2464        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2465        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2466        let struct_array = StructArray::new(
2467            struct_fields,
2468            vec![Arc::new(id_array), Arc::new(name_array)],
2469            None,
2470        );
2471
2472        // Create ListView: [{1, "a"}, {2, "b"}], null, [{3, "c"}, {4, "d"}, {5, "e"}]
2473        let list_view = ListViewArray::new(
2474            list_field,
2475            vec![0, 2, 2].into(), // offsets
2476            vec![2, 0, 3].into(), // sizes
2477            Arc::new(struct_array),
2478            Some(vec![true, false, true].into()),
2479        );
2480
2481        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2482
2483        roundtrip(batch, None);
2484    }
2485
2486    #[test]
2487    fn arrow_writer_binary() {
2488        let string_field = Field::new("a", DataType::Utf8, false);
2489        let binary_field = Field::new("b", DataType::Binary, false);
2490        let schema = Schema::new(vec![string_field, binary_field]);
2491
2492        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2493        let raw_binary_values = [
2494            b"foo".to_vec(),
2495            b"bar".to_vec(),
2496            b"baz".to_vec(),
2497            b"quux".to_vec(),
2498        ];
2499        let raw_binary_value_refs = raw_binary_values
2500            .iter()
2501            .map(|x| x.as_slice())
2502            .collect::<Vec<_>>();
2503
2504        let string_values = StringArray::from(raw_string_values.clone());
2505        let binary_values = BinaryArray::from(raw_binary_value_refs);
2506        let batch = RecordBatch::try_new(
2507            Arc::new(schema),
2508            vec![Arc::new(string_values), Arc::new(binary_values)],
2509        )
2510        .unwrap();
2511
2512        roundtrip(batch, Some(SMALL_SIZE / 2));
2513    }
2514
2515    #[test]
2516    fn arrow_writer_binary_view() {
2517        let string_field = Field::new("a", DataType::Utf8View, false);
2518        let binary_field = Field::new("b", DataType::BinaryView, false);
2519        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2520        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2521
2522        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2523        let raw_binary_values = vec![
2524            b"foo".to_vec(),
2525            b"bar".to_vec(),
2526            b"large payload over 12 bytes".to_vec(),
2527            b"lulu".to_vec(),
2528        ];
2529        let nullable_string_values =
2530            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2531
2532        let string_view_values = StringViewArray::from(raw_string_values);
2533        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2534        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2535        let batch = RecordBatch::try_new(
2536            Arc::new(schema),
2537            vec![
2538                Arc::new(string_view_values),
2539                Arc::new(binary_view_values),
2540                Arc::new(nullable_string_view_values),
2541            ],
2542        )
2543        .unwrap();
2544
2545        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2546        roundtrip(batch, None);
2547    }
2548
2549    #[test]
2550    fn arrow_writer_binary_view_long_value() {
2551        let string_field = Field::new("a", DataType::Utf8View, false);
2552        let binary_field = Field::new("b", DataType::BinaryView, false);
2553        let schema = Schema::new(vec![string_field, binary_field]);
2554
2555        // There is special case validation for long values (greater than 128)
2556        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
2557        // trigger the long-string UTF-8 validation branch in the plain decoder.
2558        let long = "a".repeat(128);
2559        let raw_string_values = vec!["foo", long.as_str(), "bar"];
2560        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2561
2562        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2563        let binary_view_values: ArrayRef =
2564            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2565
2566        one_column_roundtrip(Arc::clone(&string_view_values), false);
2567        one_column_roundtrip(Arc::clone(&binary_view_values), false);
2568
2569        let batch = RecordBatch::try_new(
2570            Arc::new(schema),
2571            vec![string_view_values, binary_view_values],
2572        )
2573        .unwrap();
2574
2575        // Disable dictionary to exercise plain encoding paths in the reader.
2576        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2577            let props = WriterProperties::builder()
2578                .set_writer_version(version)
2579                .set_dictionary_enabled(false)
2580                .build();
2581            roundtrip_opts(&batch, props);
2582        }
2583    }
2584
2585    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2586        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2587        let schema = Schema::new(vec![decimal_field]);
2588
2589        let decimal_values = vec![10_000, 50_000, 0, -100]
2590            .into_iter()
2591            .map(Some)
2592            .collect::<Decimal128Array>()
2593            .with_precision_and_scale(precision, scale)
2594            .unwrap();
2595
2596        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2597    }
2598
2599    #[test]
2600    fn arrow_writer_decimal() {
2601        // int32 to store the decimal value
2602        let batch_int32_decimal = get_decimal_batch(5, 2);
2603        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2604        // int64 to store the decimal value
2605        let batch_int64_decimal = get_decimal_batch(12, 2);
2606        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2607        // fixed_length_byte_array to store the decimal value
2608        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2609        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2610    }
2611
2612    #[test]
2613    fn arrow_writer_complex() {
2614        // define schema
2615        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2616        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2617        let struct_field_g = Arc::new(Field::new_list(
2618            "g",
2619            Field::new_list_field(DataType::Int16, true),
2620            false,
2621        ));
2622        let struct_field_h = Arc::new(Field::new_list(
2623            "h",
2624            Field::new_list_field(DataType::Int16, false),
2625            true,
2626        ));
2627        let struct_field_e = Arc::new(Field::new_struct(
2628            "e",
2629            vec![
2630                struct_field_f.clone(),
2631                struct_field_g.clone(),
2632                struct_field_h.clone(),
2633            ],
2634            false,
2635        ));
2636        let schema = Schema::new(vec![
2637            Field::new("a", DataType::Int32, false),
2638            Field::new("b", DataType::Int32, true),
2639            Field::new_struct(
2640                "c",
2641                vec![struct_field_d.clone(), struct_field_e.clone()],
2642                false,
2643            ),
2644        ]);
2645
2646        // create some data
2647        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2648        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2649        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2650        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2651
2652        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2653
2654        // Construct a buffer for value offsets, for the nested array:
2655        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2656        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2657
2658        // Construct a list array from the above two
2659        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2660            .len(5)
2661            .add_buffer(g_value_offsets.clone())
2662            .add_child_data(g_value.to_data())
2663            .build()
2664            .unwrap();
2665        let g = ListArray::from(g_list_data);
2666        // The difference between g and h is that h has a null bitmap
2667        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2668            .len(5)
2669            .add_buffer(g_value_offsets)
2670            .add_child_data(g_value.to_data())
2671            .null_bit_buffer(Some(Buffer::from([0b00011011])))
2672            .build()
2673            .unwrap();
2674        let h = ListArray::from(h_list_data);
2675
2676        let e = StructArray::from(vec![
2677            (struct_field_f, Arc::new(f) as ArrayRef),
2678            (struct_field_g, Arc::new(g) as ArrayRef),
2679            (struct_field_h, Arc::new(h) as ArrayRef),
2680        ]);
2681
2682        let c = StructArray::from(vec![
2683            (struct_field_d, Arc::new(d) as ArrayRef),
2684            (struct_field_e, Arc::new(e) as ArrayRef),
2685        ]);
2686
2687        // build a record batch
2688        let batch = RecordBatch::try_new(
2689            Arc::new(schema),
2690            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2691        )
2692        .unwrap();
2693
2694        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2695        roundtrip(batch, Some(SMALL_SIZE / 3));
2696    }
2697
2698    #[test]
2699    fn arrow_writer_complex_mixed() {
2700        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
2701        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
2702
2703        // define schema
2704        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2705        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2706        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2707        let schema = Schema::new(vec![Field::new(
2708            "some_nested_object",
2709            DataType::Struct(Fields::from(vec![
2710                offset_field.clone(),
2711                partition_field.clone(),
2712                topic_field.clone(),
2713            ])),
2714            false,
2715        )]);
2716
2717        // create some data
2718        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2719        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2720        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2721
2722        let some_nested_object = StructArray::from(vec![
2723            (offset_field, Arc::new(offset) as ArrayRef),
2724            (partition_field, Arc::new(partition) as ArrayRef),
2725            (topic_field, Arc::new(topic) as ArrayRef),
2726        ]);
2727
2728        // build a record batch
2729        let batch =
2730            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2731
2732        roundtrip(batch, Some(SMALL_SIZE / 2));
2733    }
2734
2735    #[test]
2736    fn arrow_writer_map() {
2737        // Note: we are using the JSON Arrow reader for brevity
2738        let json_content = r#"
2739        {"stocks":{"long": "$AAA", "short": "$BBB"}}
2740        {"stocks":{"long": null, "long": "$CCC", "short": null}}
2741        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2742        "#;
2743        let entries_struct_type = DataType::Struct(Fields::from(vec![
2744            Field::new("key", DataType::Utf8, false),
2745            Field::new("value", DataType::Utf8, true),
2746        ]));
2747        let stocks_field = Field::new(
2748            "stocks",
2749            DataType::Map(
2750                Arc::new(Field::new("entries", entries_struct_type, false)),
2751                false,
2752            ),
2753            true,
2754        );
2755        let schema = Arc::new(Schema::new(vec![stocks_field]));
2756        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2757        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2758
2759        let batch = reader.next().unwrap().unwrap();
2760        roundtrip(batch, None);
2761    }
2762
2763    #[test]
2764    fn arrow_writer_2_level_struct() {
2765        // tests writing <struct<struct<primitive>>
2766        let field_c = Field::new("c", DataType::Int32, true);
2767        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2768        let type_a = DataType::Struct(vec![field_b.clone()].into());
2769        let field_a = Field::new("a", type_a, true);
2770        let schema = Schema::new(vec![field_a.clone()]);
2771
2772        // create data
2773        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2774        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2775            .len(6)
2776            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2777            .add_child_data(c.into_data())
2778            .build()
2779            .unwrap();
2780        let b = StructArray::from(b_data);
2781        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2782            .len(6)
2783            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2784            .add_child_data(b.into_data())
2785            .build()
2786            .unwrap();
2787        let a = StructArray::from(a_data);
2788
2789        assert_eq!(a.null_count(), 1);
2790        assert_eq!(a.column(0).null_count(), 2);
2791
2792        // build a racord batch
2793        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2794
2795        roundtrip(batch, Some(SMALL_SIZE / 2));
2796    }
2797
2798    #[test]
2799    fn arrow_writer_2_level_struct_non_null() {
2800        // tests writing <struct<struct<primitive>>
2801        let field_c = Field::new("c", DataType::Int32, false);
2802        let type_b = DataType::Struct(vec![field_c].into());
2803        let field_b = Field::new("b", type_b.clone(), false);
2804        let type_a = DataType::Struct(vec![field_b].into());
2805        let field_a = Field::new("a", type_a.clone(), false);
2806        let schema = Schema::new(vec![field_a]);
2807
2808        // create data
2809        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2810        let b_data = ArrayDataBuilder::new(type_b)
2811            .len(6)
2812            .add_child_data(c.into_data())
2813            .build()
2814            .unwrap();
2815        let b = StructArray::from(b_data);
2816        let a_data = ArrayDataBuilder::new(type_a)
2817            .len(6)
2818            .add_child_data(b.into_data())
2819            .build()
2820            .unwrap();
2821        let a = StructArray::from(a_data);
2822
2823        assert_eq!(a.null_count(), 0);
2824        assert_eq!(a.column(0).null_count(), 0);
2825
2826        // build a racord batch
2827        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2828
2829        roundtrip(batch, Some(SMALL_SIZE / 2));
2830    }
2831
2832    #[test]
2833    fn arrow_writer_2_level_struct_mixed_null() {
2834        // tests writing <struct<struct<primitive>>
2835        let field_c = Field::new("c", DataType::Int32, false);
2836        let type_b = DataType::Struct(vec![field_c].into());
2837        let field_b = Field::new("b", type_b.clone(), true);
2838        let type_a = DataType::Struct(vec![field_b].into());
2839        let field_a = Field::new("a", type_a.clone(), false);
2840        let schema = Schema::new(vec![field_a]);
2841
2842        // create data
2843        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2844        let b_data = ArrayDataBuilder::new(type_b)
2845            .len(6)
2846            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2847            .add_child_data(c.into_data())
2848            .build()
2849            .unwrap();
2850        let b = StructArray::from(b_data);
2851        // a intentionally has no null buffer, to test that this is handled correctly
2852        let a_data = ArrayDataBuilder::new(type_a)
2853            .len(6)
2854            .add_child_data(b.into_data())
2855            .build()
2856            .unwrap();
2857        let a = StructArray::from(a_data);
2858
2859        assert_eq!(a.null_count(), 0);
2860        assert_eq!(a.column(0).null_count(), 2);
2861
2862        // build a racord batch
2863        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2864
2865        roundtrip(batch, Some(SMALL_SIZE / 2));
2866    }
2867
2868    #[test]
2869    fn arrow_writer_2_level_struct_mixed_null_2() {
2870        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2871        let field_c = Field::new("c", DataType::Int32, false);
2872        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2873        let field_e = Field::new(
2874            "e",
2875            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2876            false,
2877        );
2878
2879        let field_b = Field::new(
2880            "b",
2881            DataType::Struct(vec![field_c, field_d, field_e].into()),
2882            false,
2883        );
2884        let type_a = DataType::Struct(vec![field_b.clone()].into());
2885        let field_a = Field::new("a", type_a, true);
2886        let schema = Schema::new(vec![field_a.clone()]);
2887
2888        // create data
2889        let c = Int32Array::from_iter_values(0..6);
2890        let d = FixedSizeBinaryArray::try_from_iter(
2891            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2892        )
2893        .expect("four byte values");
2894        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2895        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2896            .len(6)
2897            .add_child_data(c.into_data())
2898            .add_child_data(d.into_data())
2899            .add_child_data(e.into_data())
2900            .build()
2901            .unwrap();
2902        let b = StructArray::from(b_data);
2903        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2904            .len(6)
2905            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2906            .add_child_data(b.into_data())
2907            .build()
2908            .unwrap();
2909        let a = StructArray::from(a_data);
2910
2911        assert_eq!(a.null_count(), 3);
2912        assert_eq!(a.column(0).null_count(), 0);
2913
2914        // build a record batch
2915        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2916
2917        roundtrip(batch, Some(SMALL_SIZE / 2));
2918    }
2919
2920    #[test]
2921    fn test_fixed_size_binary_in_dict() {
2922        fn test_fixed_size_binary_in_dict_inner<K>()
2923        where
2924            K: ArrowDictionaryKeyType,
2925            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2926            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2927        {
2928            let field = Field::new(
2929                "a",
2930                DataType::Dictionary(
2931                    Box::new(K::DATA_TYPE),
2932                    Box::new(DataType::FixedSizeBinary(4)),
2933                ),
2934                false,
2935            );
2936            let schema = Schema::new(vec![field]);
2937
2938            let keys: Vec<K::Native> = vec![
2939                K::Native::try_from(0u8).unwrap(),
2940                K::Native::try_from(0u8).unwrap(),
2941                K::Native::try_from(1u8).unwrap(),
2942            ];
2943            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2944            let values = FixedSizeBinaryArray::try_from_iter(
2945                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2946            )
2947            .unwrap();
2948
2949            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2950            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2951            roundtrip(batch, None);
2952        }
2953
2954        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2955        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2956        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2957        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2958        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2959        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2960        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2961        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2962    }
2963
2964    #[test]
2965    fn test_empty_dict() {
2966        let struct_fields = Fields::from(vec![Field::new(
2967            "dict",
2968            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2969            false,
2970        )]);
2971
2972        let schema = Schema::new(vec![Field::new_struct(
2973            "struct",
2974            struct_fields.clone(),
2975            true,
2976        )]);
2977        let dictionary = Arc::new(DictionaryArray::new(
2978            Int32Array::new_null(5),
2979            Arc::new(StringArray::new_null(0)),
2980        ));
2981
2982        let s = StructArray::new(
2983            struct_fields,
2984            vec![dictionary],
2985            Some(NullBuffer::new_null(5)),
2986        );
2987
2988        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2989        roundtrip(batch, None);
2990    }
2991    #[test]
2992    fn arrow_writer_page_size() {
2993        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2994
2995        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2996
2997        // Generate an array of 10 unique 10 character string
2998        for i in 0..10 {
2999            let value = i
3000                .to_string()
3001                .repeat(10)
3002                .chars()
3003                .take(10)
3004                .collect::<String>();
3005
3006            builder.append_value(value);
3007        }
3008
3009        let array = Arc::new(builder.finish());
3010
3011        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
3012
3013        let file = tempfile::tempfile().unwrap();
3014
3015        // Set everything very low so we fallback to PLAIN encoding after the first row
3016        let props = WriterProperties::builder()
3017            .set_data_page_size_limit(1)
3018            .set_dictionary_page_size_limit(1)
3019            .set_write_batch_size(1)
3020            .build();
3021
3022        let mut writer =
3023            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3024                .expect("Unable to write file");
3025        writer.write(&batch).unwrap();
3026        writer.close().unwrap();
3027
3028        let options = ReadOptionsBuilder::new().with_page_index().build();
3029        let reader =
3030            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
3031
3032        let column = reader.metadata().row_group(0).columns();
3033
3034        assert_eq!(column.len(), 1);
3035
3036        // We should write one row before falling back to PLAIN encoding so there should still be a
3037        // dictionary page.
3038        assert!(
3039            column[0].dictionary_page_offset().is_some(),
3040            "Expected a dictionary page"
3041        );
3042
3043        assert!(reader.metadata().offset_index().is_some());
3044        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
3045
3046        let page_locations = offset_indexes[0].page_locations.clone();
3047
3048        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
3049        // so we expect one dictionary encoded page and then a page per row thereafter.
3050        assert_eq!(
3051            page_locations.len(),
3052            10,
3053            "Expected 10 pages but got {page_locations:#?}"
3054        );
3055    }
3056
3057    #[test]
3058    fn arrow_writer_float_nans() {
3059        let f16_field = Field::new("a", DataType::Float16, false);
3060        let f32_field = Field::new("b", DataType::Float32, false);
3061        let f64_field = Field::new("c", DataType::Float64, false);
3062        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
3063
3064        let f16_values = (0..MEDIUM_SIZE)
3065            .map(|i| {
3066                Some(if i % 2 == 0 {
3067                    f16::NAN
3068                } else {
3069                    f16::from_f32(i as f32)
3070                })
3071            })
3072            .collect::<Float16Array>();
3073
3074        let f32_values = (0..MEDIUM_SIZE)
3075            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
3076            .collect::<Float32Array>();
3077
3078        let f64_values = (0..MEDIUM_SIZE)
3079            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
3080            .collect::<Float64Array>();
3081
3082        let batch = RecordBatch::try_new(
3083            Arc::new(schema),
3084            vec![
3085                Arc::new(f16_values),
3086                Arc::new(f32_values),
3087                Arc::new(f64_values),
3088            ],
3089        )
3090        .unwrap();
3091
3092        roundtrip(batch, None);
3093    }
3094
3095    const SMALL_SIZE: usize = 7;
3096    const MEDIUM_SIZE: usize = 63;
3097
3098    // Write the batch to parquet and read it back out, ensuring
3099    // that what comes out is the same as what was written in
3100    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3101        let mut files = vec![];
3102        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3103            let mut props = WriterProperties::builder().set_writer_version(version);
3104
3105            if let Some(size) = max_row_group_size {
3106                props = props.set_max_row_group_row_count(Some(size))
3107            }
3108
3109            let props = props.build();
3110            files.push(roundtrip_opts(&expected_batch, props))
3111        }
3112        files
3113    }
3114
3115    // Round trip the specified record batch with the specified writer properties,
3116    // to an in-memory file, and validate the arrays using the specified function.
3117    // Returns the in-memory file.
3118    fn roundtrip_opts_with_array_validation<F>(
3119        expected_batch: &RecordBatch,
3120        props: WriterProperties,
3121        validate: F,
3122    ) -> Bytes
3123    where
3124        F: Fn(&ArrayData, &ArrayData),
3125    {
3126        let mut file = vec![];
3127
3128        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3129            .expect("Unable to write file");
3130        writer.write(expected_batch).unwrap();
3131        writer.close().unwrap();
3132
3133        let file = Bytes::from(file);
3134        let mut record_batch_reader =
3135            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3136
3137        let actual_batch = record_batch_reader
3138            .next()
3139            .expect("No batch found")
3140            .expect("Unable to get batch");
3141
3142        assert_eq!(expected_batch.schema(), actual_batch.schema());
3143        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3144        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3145        for i in 0..expected_batch.num_columns() {
3146            let expected_data = expected_batch.column(i).to_data();
3147            let actual_data = actual_batch.column(i).to_data();
3148            validate(&expected_data, &actual_data);
3149        }
3150
3151        file
3152    }
3153
3154    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3155        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3156            a.validate_full().expect("valid expected data");
3157            b.validate_full().expect("valid actual data");
3158            assert_eq!(a, b)
3159        })
3160    }
3161
3162    struct RoundTripOptions {
3163        values: ArrayRef,
3164        schema: SchemaRef,
3165        bloom_filter: bool,
3166        bloom_filter_ndv: Option<u64>,
3167        bloom_filter_position: BloomFilterPosition,
3168    }
3169
3170    impl RoundTripOptions {
3171        fn new(values: ArrayRef, nullable: bool) -> Self {
3172            let data_type = values.data_type().clone();
3173            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3174            Self {
3175                values,
3176                schema: Arc::new(schema),
3177                bloom_filter: false,
3178                bloom_filter_ndv: None,
3179                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3180            }
3181        }
3182    }
3183
3184    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3185        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3186    }
3187
3188    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3189        let mut options = RoundTripOptions::new(values, false);
3190        options.schema = schema;
3191        one_column_roundtrip_with_options(options)
3192    }
3193
3194    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3195        let RoundTripOptions {
3196            values,
3197            schema,
3198            bloom_filter,
3199            bloom_filter_ndv,
3200            bloom_filter_position,
3201        } = options;
3202
3203        let encodings = match values.data_type() {
3204            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3205                vec![
3206                    Encoding::PLAIN,
3207                    Encoding::DELTA_BYTE_ARRAY,
3208                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
3209                ]
3210            }
3211            DataType::Int64
3212            | DataType::Int32
3213            | DataType::Int16
3214            | DataType::Int8
3215            | DataType::UInt64
3216            | DataType::UInt32
3217            | DataType::UInt16
3218            | DataType::UInt8 => vec![
3219                Encoding::PLAIN,
3220                Encoding::DELTA_BINARY_PACKED,
3221                Encoding::BYTE_STREAM_SPLIT,
3222            ],
3223            DataType::Float32 | DataType::Float64 => {
3224                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3225            }
3226            _ => vec![Encoding::PLAIN],
3227        };
3228
3229        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3230
3231        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3232
3233        let mut files = vec![];
3234        for dictionary_size in [0, 1, 1024] {
3235            for encoding in &encodings {
3236                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3237                    for row_group_size in row_group_sizes {
3238                        let mut builder = WriterProperties::builder()
3239                            .set_writer_version(version)
3240                            .set_max_row_group_row_count(Some(row_group_size))
3241                            .set_dictionary_enabled(dictionary_size != 0)
3242                            .set_dictionary_page_size_limit(dictionary_size.max(1))
3243                            .set_encoding(*encoding)
3244                            .set_bloom_filter_enabled(bloom_filter)
3245                            .set_bloom_filter_position(bloom_filter_position);
3246                        if let Some(ndv) = bloom_filter_ndv {
3247                            builder = builder.set_bloom_filter_max_ndv(ndv);
3248                        }
3249                        let props = builder.build();
3250
3251                        files.push(roundtrip_opts(&expected_batch, props))
3252                    }
3253                }
3254            }
3255        }
3256        files
3257    }
3258
3259    fn values_required<A, I>(iter: I) -> Vec<Bytes>
3260    where
3261        A: From<Vec<I::Item>> + Array + 'static,
3262        I: IntoIterator,
3263    {
3264        let raw_values: Vec<_> = iter.into_iter().collect();
3265        let values = Arc::new(A::from(raw_values));
3266        one_column_roundtrip(values, false)
3267    }
3268
3269    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3270    where
3271        A: From<Vec<Option<I::Item>>> + Array + 'static,
3272        I: IntoIterator,
3273    {
3274        let optional_raw_values: Vec<_> = iter
3275            .into_iter()
3276            .enumerate()
3277            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3278            .collect();
3279        let optional_values = Arc::new(A::from(optional_raw_values));
3280        one_column_roundtrip(optional_values, true)
3281    }
3282
3283    fn required_and_optional<A, I>(iter: I)
3284    where
3285        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3286        I: IntoIterator + Clone,
3287    {
3288        values_required::<A, I>(iter.clone());
3289        values_optional::<A, I>(iter);
3290    }
3291
3292    fn check_bloom_filter<T: AsBytes>(
3293        files: Vec<Bytes>,
3294        file_column: String,
3295        positive_values: Vec<T>,
3296        negative_values: Vec<T>,
3297    ) {
3298        files.into_iter().take(1).for_each(|file| {
3299            let file_reader = SerializedFileReader::new_with_options(
3300                file,
3301                ReadOptionsBuilder::new()
3302                    .with_reader_properties(
3303                        ReaderProperties::builder()
3304                            .set_read_bloom_filter(true)
3305                            .build(),
3306                    )
3307                    .build(),
3308            )
3309            .expect("Unable to open file as Parquet");
3310            let metadata = file_reader.metadata();
3311
3312            // Gets bloom filters from all row groups.
3313            let mut bloom_filters: Vec<_> = vec![];
3314            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3315                if let Some((column_index, _)) = row_group
3316                    .columns()
3317                    .iter()
3318                    .enumerate()
3319                    .find(|(_, column)| column.column_path().string() == file_column)
3320                {
3321                    let row_group_reader = file_reader
3322                        .get_row_group(ri)
3323                        .expect("Unable to read row group");
3324                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3325                        bloom_filters.push(sbbf.clone());
3326                    } else {
3327                        panic!("No bloom filter for column named {file_column} found");
3328                    }
3329                } else {
3330                    panic!("No column named {file_column} found");
3331                }
3332            }
3333
3334            positive_values.iter().for_each(|value| {
3335                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3336                assert!(
3337                    found.is_some(),
3338                    "{}",
3339                    format!("Value {:?} should be in bloom filter", value.as_bytes())
3340                );
3341            });
3342
3343            negative_values.iter().for_each(|value| {
3344                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3345                assert!(
3346                    found.is_none(),
3347                    "{}",
3348                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
3349                );
3350            });
3351        });
3352    }
3353
3354    #[test]
3355    fn all_null_primitive_single_column() {
3356        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3357        one_column_roundtrip(values, true);
3358    }
3359    #[test]
3360    fn null_single_column() {
3361        let values = Arc::new(NullArray::new(SMALL_SIZE));
3362        one_column_roundtrip(values, true);
3363        // null arrays are always nullable, a test with non-nullable nulls fails
3364    }
3365
3366    #[test]
3367    fn bool_single_column() {
3368        required_and_optional::<BooleanArray, _>(
3369            [true, false].iter().cycle().copied().take(SMALL_SIZE),
3370        );
3371    }
3372
3373    #[test]
3374    fn bool_large_single_column() {
3375        let values = Arc::new(
3376            [None, Some(true), Some(false)]
3377                .iter()
3378                .cycle()
3379                .copied()
3380                .take(200_000)
3381                .collect::<BooleanArray>(),
3382        );
3383        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3384        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3385        let file = tempfile::tempfile().unwrap();
3386
3387        let mut writer =
3388            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3389                .expect("Unable to write file");
3390        writer.write(&expected_batch).unwrap();
3391        writer.close().unwrap();
3392    }
3393
3394    #[test]
3395    fn check_page_offset_index_with_nan() {
3396        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3397        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3398        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3399
3400        let mut out = Vec::with_capacity(1024);
3401        let mut writer =
3402            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3403        writer.write(&batch).unwrap();
3404        let file_meta_data = writer.close().unwrap();
3405        for row_group in file_meta_data.row_groups() {
3406            for column in row_group.columns() {
3407                assert!(column.offset_index_offset().is_some());
3408                assert!(column.offset_index_length().is_some());
3409                assert!(column.column_index_offset().is_none());
3410                assert!(column.column_index_length().is_none());
3411            }
3412        }
3413    }
3414
3415    #[test]
3416    fn i8_single_column() {
3417        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3418    }
3419
3420    #[test]
3421    fn i16_single_column() {
3422        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3423    }
3424
3425    #[test]
3426    fn i32_single_column() {
3427        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3428    }
3429
3430    #[test]
3431    fn i64_single_column() {
3432        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3433    }
3434
3435    #[test]
3436    fn u8_single_column() {
3437        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3438    }
3439
3440    #[test]
3441    fn u16_single_column() {
3442        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3443    }
3444
3445    #[test]
3446    fn u32_single_column() {
3447        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3448    }
3449
3450    #[test]
3451    fn u64_single_column() {
3452        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3453    }
3454
3455    #[test]
3456    fn f32_single_column() {
3457        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3458    }
3459
3460    #[test]
3461    fn f64_single_column() {
3462        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3463    }
3464
3465    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
3466    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
3467    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
3468
3469    #[test]
3470    fn timestamp_second_single_column() {
3471        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3472        let values = Arc::new(TimestampSecondArray::from(raw_values));
3473
3474        one_column_roundtrip(values, false);
3475    }
3476
3477    #[test]
3478    fn timestamp_millisecond_single_column() {
3479        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3480        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3481
3482        one_column_roundtrip(values, false);
3483    }
3484
3485    #[test]
3486    fn timestamp_microsecond_single_column() {
3487        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3488        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3489
3490        one_column_roundtrip(values, false);
3491    }
3492
3493    #[test]
3494    fn timestamp_nanosecond_single_column() {
3495        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3496        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3497
3498        one_column_roundtrip(values, false);
3499    }
3500
3501    #[test]
3502    fn date32_single_column() {
3503        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3504    }
3505
3506    #[test]
3507    fn date64_single_column() {
3508        // Date64 must be a multiple of 86400000, see ARROW-10925
3509        required_and_optional::<Date64Array, _>(
3510            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3511        );
3512    }
3513
3514    #[test]
3515    fn time32_second_single_column() {
3516        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3517    }
3518
3519    #[test]
3520    fn time32_millisecond_single_column() {
3521        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3522    }
3523
3524    #[test]
3525    fn time64_microsecond_single_column() {
3526        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3527    }
3528
3529    #[test]
3530    fn time64_nanosecond_single_column() {
3531        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3532    }
3533
3534    #[test]
3535    fn duration_second_single_column() {
3536        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3537    }
3538
3539    #[test]
3540    fn duration_millisecond_single_column() {
3541        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3542    }
3543
3544    #[test]
3545    fn duration_microsecond_single_column() {
3546        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3547    }
3548
3549    #[test]
3550    fn duration_nanosecond_single_column() {
3551        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3552    }
3553
3554    #[test]
3555    fn interval_year_month_single_column() {
3556        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3557    }
3558
3559    #[test]
3560    fn interval_day_time_single_column() {
3561        required_and_optional::<IntervalDayTimeArray, _>(vec![
3562            IntervalDayTime::new(0, 1),
3563            IntervalDayTime::new(0, 3),
3564            IntervalDayTime::new(3, -2),
3565            IntervalDayTime::new(-200, 4),
3566        ]);
3567    }
3568
3569    #[test]
3570    #[should_panic(
3571        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3572    )]
3573    fn interval_month_day_nano_single_column() {
3574        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3575            IntervalMonthDayNano::new(0, 1, 5),
3576            IntervalMonthDayNano::new(0, 3, 2),
3577            IntervalMonthDayNano::new(3, -2, -5),
3578            IntervalMonthDayNano::new(-200, 4, -1),
3579        ]);
3580    }
3581
3582    #[test]
3583    fn binary_single_column() {
3584        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3585        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3586        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3587
3588        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3589        values_required::<BinaryArray, _>(many_vecs_iter);
3590    }
3591
3592    #[test]
3593    fn binary_view_single_column() {
3594        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3595        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3596        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3597
3598        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3599        values_required::<BinaryViewArray, _>(many_vecs_iter);
3600    }
3601
3602    #[test]
3603    fn i32_column_bloom_filter_at_end() {
3604        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3605        let mut options = RoundTripOptions::new(array, false);
3606        options.bloom_filter = true;
3607        options.bloom_filter_position = BloomFilterPosition::End;
3608
3609        let files = one_column_roundtrip_with_options(options);
3610        check_bloom_filter(
3611            files,
3612            "col".to_string(),
3613            (0..SMALL_SIZE as i32).collect(),
3614            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3615        );
3616    }
3617
3618    #[test]
3619    fn i32_column_bloom_filter() {
3620        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3621        let mut options = RoundTripOptions::new(array, false);
3622        options.bloom_filter = true;
3623
3624        let files = one_column_roundtrip_with_options(options);
3625        check_bloom_filter(
3626            files,
3627            "col".to_string(),
3628            (0..SMALL_SIZE as i32).collect(),
3629            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3630        );
3631    }
3632
3633    /// Test that bloom filter folding produces correct results even when
3634    /// the configured NDV differs significantly from actual NDV.
3635    /// A large NDV means a larger initial filter that gets folded down;
3636    /// a small NDV means a smaller initial filter.
3637    #[test]
3638    fn i32_column_bloom_filter_fixed_ndv() {
3639        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3640
3641        // NDV much larger than actual distinct values — tests folding a large filter down
3642        let mut options = RoundTripOptions::new(array.clone(), false);
3643        options.bloom_filter = true;
3644        options.bloom_filter_ndv = Some(1_000_000);
3645
3646        let files = one_column_roundtrip_with_options(options);
3647        check_bloom_filter(
3648            files,
3649            "col".to_string(),
3650            (0..SMALL_SIZE as i32).collect(),
3651            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3652        );
3653
3654        // NDV smaller than actual distinct values — tests the underestimate path
3655        let mut options = RoundTripOptions::new(array, false);
3656        options.bloom_filter = true;
3657        options.bloom_filter_ndv = Some(3);
3658
3659        let files = one_column_roundtrip_with_options(options);
3660        check_bloom_filter(
3661            files,
3662            "col".to_string(),
3663            (0..SMALL_SIZE as i32).collect(),
3664            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3665        );
3666    }
3667
3668    #[test]
3669    fn binary_column_bloom_filter() {
3670        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3671        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3672        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3673
3674        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3675        let mut options = RoundTripOptions::new(array, false);
3676        options.bloom_filter = true;
3677
3678        let files = one_column_roundtrip_with_options(options);
3679        check_bloom_filter(
3680            files,
3681            "col".to_string(),
3682            many_vecs,
3683            vec![vec![(SMALL_SIZE + 1) as u8]],
3684        );
3685    }
3686
3687    #[test]
3688    fn empty_string_null_column_bloom_filter() {
3689        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3690        let raw_strs = raw_values.iter().map(|s| s.as_str());
3691
3692        let array = Arc::new(StringArray::from_iter_values(raw_strs));
3693        let mut options = RoundTripOptions::new(array, false);
3694        options.bloom_filter = true;
3695
3696        let files = one_column_roundtrip_with_options(options);
3697
3698        let optional_raw_values: Vec<_> = raw_values
3699            .iter()
3700            .enumerate()
3701            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3702            .collect();
3703        // For null slots, empty string should not be in bloom filter.
3704        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3705    }
3706
3707    #[test]
3708    fn large_binary_single_column() {
3709        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3710        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3711        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3712
3713        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3714        values_required::<LargeBinaryArray, _>(many_vecs_iter);
3715    }
3716
3717    #[test]
3718    fn fixed_size_binary_single_column() {
3719        let mut builder = FixedSizeBinaryBuilder::new(4);
3720        builder.append_value(b"0123").unwrap();
3721        builder.append_null();
3722        builder.append_value(b"8910").unwrap();
3723        builder.append_value(b"1112").unwrap();
3724        let array = Arc::new(builder.finish());
3725
3726        one_column_roundtrip(array, true);
3727    }
3728
3729    #[test]
3730    fn string_single_column() {
3731        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3732        let raw_strs = raw_values.iter().map(|s| s.as_str());
3733
3734        required_and_optional::<StringArray, _>(raw_strs);
3735    }
3736
3737    #[test]
3738    fn large_string_single_column() {
3739        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3740        let raw_strs = raw_values.iter().map(|s| s.as_str());
3741
3742        required_and_optional::<LargeStringArray, _>(raw_strs);
3743    }
3744
3745    #[test]
3746    fn string_view_single_column() {
3747        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3748        let raw_strs = raw_values.iter().map(|s| s.as_str());
3749
3750        required_and_optional::<StringViewArray, _>(raw_strs);
3751    }
3752
3753    #[test]
3754    fn null_list_single_column() {
3755        let null_field = Field::new_list_field(DataType::Null, true);
3756        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3757
3758        let schema = Schema::new(vec![list_field]);
3759
3760        // Build [[], null, [null, null]]
3761        let a_values = NullArray::new(2);
3762        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3763        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3764            DataType::Null,
3765            true,
3766        ))))
3767        .len(3)
3768        .add_buffer(a_value_offsets)
3769        .null_bit_buffer(Some(Buffer::from([0b00000101])))
3770        .add_child_data(a_values.into_data())
3771        .build()
3772        .unwrap();
3773
3774        let a = ListArray::from(a_list_data);
3775
3776        assert!(a.is_valid(0));
3777        assert!(!a.is_valid(1));
3778        assert!(a.is_valid(2));
3779
3780        assert_eq!(a.value(0).len(), 0);
3781        assert_eq!(a.value(2).len(), 2);
3782        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3783
3784        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3785        roundtrip(batch, None);
3786    }
3787
3788    #[test]
3789    fn list_single_column() {
3790        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3791        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3792        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3793            DataType::Int32,
3794            false,
3795        ))))
3796        .len(5)
3797        .add_buffer(a_value_offsets)
3798        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3799        .add_child_data(a_values.into_data())
3800        .build()
3801        .unwrap();
3802
3803        assert_eq!(a_list_data.null_count(), 1);
3804
3805        let a = ListArray::from(a_list_data);
3806        let values = Arc::new(a);
3807
3808        one_column_roundtrip(values, true);
3809    }
3810
3811    #[test]
3812    fn large_list_single_column() {
3813        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3814        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3815        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3816            "large_item",
3817            DataType::Int32,
3818            true,
3819        ))))
3820        .len(5)
3821        .add_buffer(a_value_offsets)
3822        .add_child_data(a_values.into_data())
3823        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3824        .build()
3825        .unwrap();
3826
3827        // I think this setup is incorrect because this should pass
3828        assert_eq!(a_list_data.null_count(), 1);
3829
3830        let a = LargeListArray::from(a_list_data);
3831        let values = Arc::new(a);
3832
3833        one_column_roundtrip(values, true);
3834    }
3835
3836    #[test]
3837    fn list_nested_nulls() {
3838        use arrow::datatypes::Int32Type;
3839        let data = vec![
3840            Some(vec![Some(1)]),
3841            Some(vec![Some(2), Some(3)]),
3842            None,
3843            Some(vec![Some(4), Some(5), None]),
3844            Some(vec![None]),
3845            Some(vec![Some(6), Some(7)]),
3846        ];
3847
3848        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3849        one_column_roundtrip(Arc::new(list), true);
3850
3851        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3852        one_column_roundtrip(Arc::new(list), true);
3853    }
3854
3855    #[test]
3856    fn struct_single_column() {
3857        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3858        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3859        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3860
3861        let values = Arc::new(s);
3862        one_column_roundtrip(values, false);
3863    }
3864
3865    #[test]
3866    fn list_and_map_coerced_names() {
3867        // Create map and list with non-Parquet naming
3868        let list_field =
3869            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3870        let map_field = Field::new_map(
3871            "my_map",
3872            "entries",
3873            Field::new("keys", DataType::Int32, false),
3874            Field::new("values", DataType::Int32, true),
3875            false,
3876            true,
3877        );
3878
3879        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3880        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3881
3882        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3883
3884        // Write data to Parquet but coerce names to match spec
3885        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3886        let file = tempfile::tempfile().unwrap();
3887        let mut writer =
3888            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3889
3890        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3891        writer.write(&batch).unwrap();
3892        let file_metadata = writer.close().unwrap();
3893
3894        let schema = file_metadata.file_metadata().schema();
3895        // Coerced name of "item" should be "element"
3896        let list_field = &schema.get_fields()[0].get_fields()[0];
3897        assert_eq!(list_field.get_fields()[0].name(), "element");
3898
3899        let map_field = &schema.get_fields()[1].get_fields()[0];
3900        // Coerced name of "entries" should be "key_value"
3901        assert_eq!(map_field.name(), "key_value");
3902        // Coerced name of "keys" should be "key"
3903        assert_eq!(map_field.get_fields()[0].name(), "key");
3904        // Coerced name of "values" should be "value"
3905        assert_eq!(map_field.get_fields()[1].name(), "value");
3906
3907        // Double check schema after reading from the file
3908        let reader = SerializedFileReader::new(file).unwrap();
3909        let file_schema = reader.metadata().file_metadata().schema();
3910        let fields = file_schema.get_fields();
3911        let list_field = &fields[0].get_fields()[0];
3912        assert_eq!(list_field.get_fields()[0].name(), "element");
3913        let map_field = &fields[1].get_fields()[0];
3914        assert_eq!(map_field.name(), "key_value");
3915        assert_eq!(map_field.get_fields()[0].name(), "key");
3916        assert_eq!(map_field.get_fields()[1].name(), "value");
3917    }
3918
3919    #[test]
3920    fn fallback_flush_data_page() {
3921        //tests if the Fallback::flush_data_page clears all buffers correctly
3922        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3923        let values = Arc::new(StringArray::from(raw_values));
3924        let encodings = vec![
3925            Encoding::DELTA_BYTE_ARRAY,
3926            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3927        ];
3928        let data_type = values.data_type().clone();
3929        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3930        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3931
3932        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3933        let data_page_size_limit: usize = 32;
3934        let write_batch_size: usize = 16;
3935
3936        for encoding in &encodings {
3937            for row_group_size in row_group_sizes {
3938                let props = WriterProperties::builder()
3939                    .set_writer_version(WriterVersion::PARQUET_2_0)
3940                    .set_max_row_group_row_count(Some(row_group_size))
3941                    .set_dictionary_enabled(false)
3942                    .set_encoding(*encoding)
3943                    .set_data_page_size_limit(data_page_size_limit)
3944                    .set_write_batch_size(write_batch_size)
3945                    .build();
3946
3947                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3948                    let string_array_a = StringArray::from(a.clone());
3949                    let string_array_b = StringArray::from(b.clone());
3950                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3951                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3952                    assert_eq!(
3953                        vec_a, vec_b,
3954                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3955                    );
3956                });
3957            }
3958        }
3959    }
3960
3961    #[test]
3962    fn arrow_writer_string_dictionary() {
3963        // define schema
3964        #[allow(deprecated)]
3965        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3966            "dictionary",
3967            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3968            true,
3969            42,
3970            true,
3971        )]));
3972
3973        // create some data
3974        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3975            .iter()
3976            .copied()
3977            .collect();
3978
3979        // build a record batch
3980        one_column_roundtrip_with_schema(Arc::new(d), schema);
3981    }
3982
3983    #[test]
3984    fn arrow_writer_test_type_compatibility() {
3985        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3986        where
3987            T1: Array + 'static,
3988            T2: Array + 'static,
3989        {
3990            let schema1 = Arc::new(Schema::new(vec![Field::new(
3991                "a",
3992                array1.data_type().clone(),
3993                false,
3994            )]));
3995
3996            let file = tempfile().unwrap();
3997            let mut writer =
3998                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3999
4000            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
4001            writer.write(&rb1).unwrap();
4002
4003            let schema2 = Arc::new(Schema::new(vec![Field::new(
4004                "a",
4005                array2.data_type().clone(),
4006                false,
4007            )]));
4008            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
4009            writer.write(&rb2).unwrap();
4010
4011            writer.close().unwrap();
4012
4013            let mut record_batch_reader =
4014                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
4015            let actual_batch = record_batch_reader.next().unwrap().unwrap();
4016
4017            let expected_batch =
4018                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
4019            assert_eq!(actual_batch, expected_batch);
4020        }
4021
4022        // check compatibility between native and dictionaries
4023
4024        ensure_compatible_write(
4025            DictionaryArray::new(
4026                UInt8Array::from_iter_values(vec![0]),
4027                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4028            ),
4029            StringArray::from_iter_values(vec!["barquet"]),
4030            DictionaryArray::new(
4031                UInt8Array::from_iter_values(vec![0, 1]),
4032                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4033            ),
4034        );
4035
4036        ensure_compatible_write(
4037            StringArray::from_iter_values(vec!["parquet"]),
4038            DictionaryArray::new(
4039                UInt8Array::from_iter_values(vec![0]),
4040                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4041            ),
4042            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4043        );
4044
4045        // check compatibility between dictionaries with different key types
4046
4047        ensure_compatible_write(
4048            DictionaryArray::new(
4049                UInt8Array::from_iter_values(vec![0]),
4050                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4051            ),
4052            DictionaryArray::new(
4053                UInt16Array::from_iter_values(vec![0]),
4054                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4055            ),
4056            DictionaryArray::new(
4057                UInt8Array::from_iter_values(vec![0, 1]),
4058                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4059            ),
4060        );
4061
4062        // check compatibility between dictionaries with different value types
4063        ensure_compatible_write(
4064            DictionaryArray::new(
4065                UInt8Array::from_iter_values(vec![0]),
4066                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4067            ),
4068            DictionaryArray::new(
4069                UInt8Array::from_iter_values(vec![0]),
4070                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
4071            ),
4072            DictionaryArray::new(
4073                UInt8Array::from_iter_values(vec![0, 1]),
4074                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4075            ),
4076        );
4077
4078        // check compatibility between a dictionary and a native array with a different type
4079        ensure_compatible_write(
4080            DictionaryArray::new(
4081                UInt8Array::from_iter_values(vec![0]),
4082                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4083            ),
4084            LargeStringArray::from_iter_values(vec!["barquet"]),
4085            DictionaryArray::new(
4086                UInt8Array::from_iter_values(vec![0, 1]),
4087                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4088            ),
4089        );
4090
4091        // check compatibility for string types
4092
4093        ensure_compatible_write(
4094            StringArray::from_iter_values(vec!["parquet"]),
4095            LargeStringArray::from_iter_values(vec!["barquet"]),
4096            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4097        );
4098
4099        ensure_compatible_write(
4100            LargeStringArray::from_iter_values(vec!["parquet"]),
4101            StringArray::from_iter_values(vec!["barquet"]),
4102            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4103        );
4104
4105        ensure_compatible_write(
4106            StringArray::from_iter_values(vec!["parquet"]),
4107            StringViewArray::from_iter_values(vec!["barquet"]),
4108            StringArray::from_iter_values(vec!["parquet", "barquet"]),
4109        );
4110
4111        ensure_compatible_write(
4112            StringViewArray::from_iter_values(vec!["parquet"]),
4113            StringArray::from_iter_values(vec!["barquet"]),
4114            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4115        );
4116
4117        ensure_compatible_write(
4118            LargeStringArray::from_iter_values(vec!["parquet"]),
4119            StringViewArray::from_iter_values(vec!["barquet"]),
4120            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4121        );
4122
4123        ensure_compatible_write(
4124            StringViewArray::from_iter_values(vec!["parquet"]),
4125            LargeStringArray::from_iter_values(vec!["barquet"]),
4126            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4127        );
4128
4129        // check compatibility for binary types
4130
4131        ensure_compatible_write(
4132            BinaryArray::from_iter_values(vec![b"parquet"]),
4133            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4134            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4135        );
4136
4137        ensure_compatible_write(
4138            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4139            BinaryArray::from_iter_values(vec![b"barquet"]),
4140            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4141        );
4142
4143        ensure_compatible_write(
4144            BinaryArray::from_iter_values(vec![b"parquet"]),
4145            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4146            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4147        );
4148
4149        ensure_compatible_write(
4150            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4151            BinaryArray::from_iter_values(vec![b"barquet"]),
4152            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4153        );
4154
4155        ensure_compatible_write(
4156            BinaryViewArray::from_iter_values(vec![b"parquet"]),
4157            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4158            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4159        );
4160
4161        ensure_compatible_write(
4162            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4163            BinaryViewArray::from_iter_values(vec![b"barquet"]),
4164            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4165        );
4166
4167        // check compatibility for list types
4168
4169        let list_field_metadata = HashMap::from_iter(vec![(
4170            PARQUET_FIELD_ID_META_KEY.to_string(),
4171            "1".to_string(),
4172        )]);
4173        let list_field = Field::new_list_field(DataType::Int32, false);
4174
4175        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4176        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4177
4178        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4179        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4180
4181        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4182        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4183
4184        ensure_compatible_write(
4185            // when the initial schema has the metadata ...
4186            ListArray::try_new(
4187                Arc::new(
4188                    list_field
4189                        .clone()
4190                        .with_metadata(list_field_metadata.clone()),
4191                ),
4192                offsets1,
4193                values1,
4194                None,
4195            )
4196            .unwrap(),
4197            // ... and some intermediate schema doesn't have the metadata
4198            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4199            // ... the write will still go through, and the resulting schema will inherit the initial metadata
4200            ListArray::try_new(
4201                Arc::new(
4202                    list_field
4203                        .clone()
4204                        .with_metadata(list_field_metadata.clone()),
4205                ),
4206                offsets_expected,
4207                values_expected,
4208                None,
4209            )
4210            .unwrap(),
4211        );
4212    }
4213
4214    #[test]
4215    fn arrow_writer_primitive_dictionary() {
4216        // define schema
4217        #[allow(deprecated)]
4218        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4219            "dictionary",
4220            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4221            true,
4222            42,
4223            true,
4224        )]));
4225
4226        // create some data
4227        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4228        builder.append(12345678).unwrap();
4229        builder.append_null();
4230        builder.append(22345678).unwrap();
4231        builder.append(12345678).unwrap();
4232        let d = builder.finish();
4233
4234        one_column_roundtrip_with_schema(Arc::new(d), schema);
4235    }
4236
4237    #[test]
4238    fn arrow_writer_decimal32_dictionary() {
4239        let integers = vec![12345, 56789, 34567];
4240
4241        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4242
4243        let values = Decimal32Array::from(integers.clone())
4244            .with_precision_and_scale(5, 2)
4245            .unwrap();
4246
4247        let array = DictionaryArray::new(keys, Arc::new(values));
4248        one_column_roundtrip(Arc::new(array.clone()), true);
4249
4250        let values = Decimal32Array::from(integers)
4251            .with_precision_and_scale(9, 2)
4252            .unwrap();
4253
4254        let array = array.with_values(Arc::new(values));
4255        one_column_roundtrip(Arc::new(array), true);
4256    }
4257
4258    #[test]
4259    fn arrow_writer_decimal64_dictionary() {
4260        let integers = vec![12345, 56789, 34567];
4261
4262        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4263
4264        let values = Decimal64Array::from(integers.clone())
4265            .with_precision_and_scale(5, 2)
4266            .unwrap();
4267
4268        let array = DictionaryArray::new(keys, Arc::new(values));
4269        one_column_roundtrip(Arc::new(array.clone()), true);
4270
4271        let values = Decimal64Array::from(integers)
4272            .with_precision_and_scale(12, 2)
4273            .unwrap();
4274
4275        let array = array.with_values(Arc::new(values));
4276        one_column_roundtrip(Arc::new(array), true);
4277    }
4278
4279    #[test]
4280    fn arrow_writer_decimal128_dictionary() {
4281        let integers = vec![12345, 56789, 34567];
4282
4283        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4284
4285        let values = Decimal128Array::from(integers.clone())
4286            .with_precision_and_scale(5, 2)
4287            .unwrap();
4288
4289        let array = DictionaryArray::new(keys, Arc::new(values));
4290        one_column_roundtrip(Arc::new(array.clone()), true);
4291
4292        let values = Decimal128Array::from(integers)
4293            .with_precision_and_scale(12, 2)
4294            .unwrap();
4295
4296        let array = array.with_values(Arc::new(values));
4297        one_column_roundtrip(Arc::new(array), true);
4298    }
4299
4300    #[test]
4301    fn arrow_writer_decimal256_dictionary() {
4302        let integers = vec![
4303            i256::from_i128(12345),
4304            i256::from_i128(56789),
4305            i256::from_i128(34567),
4306        ];
4307
4308        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4309
4310        let values = Decimal256Array::from(integers.clone())
4311            .with_precision_and_scale(5, 2)
4312            .unwrap();
4313
4314        let array = DictionaryArray::new(keys, Arc::new(values));
4315        one_column_roundtrip(Arc::new(array.clone()), true);
4316
4317        let values = Decimal256Array::from(integers)
4318            .with_precision_and_scale(12, 2)
4319            .unwrap();
4320
4321        let array = array.with_values(Arc::new(values));
4322        one_column_roundtrip(Arc::new(array), true);
4323    }
4324
4325    #[test]
4326    fn arrow_writer_string_dictionary_unsigned_index() {
4327        // define schema
4328        #[allow(deprecated)]
4329        let schema = Arc::new(Schema::new(vec![Field::new_dict(
4330            "dictionary",
4331            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4332            true,
4333            42,
4334            true,
4335        )]));
4336
4337        // create some data
4338        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4339            .iter()
4340            .copied()
4341            .collect();
4342
4343        one_column_roundtrip_with_schema(Arc::new(d), schema);
4344    }
4345
4346    #[test]
4347    fn u32_min_max() {
4348        // check values roundtrip through parquet
4349        let src = [
4350            u32::MIN,
4351            u32::MIN + 1,
4352            (i32::MAX as u32) - 1,
4353            i32::MAX as u32,
4354            (i32::MAX as u32) + 1,
4355            u32::MAX - 1,
4356            u32::MAX,
4357        ];
4358        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4359        let files = one_column_roundtrip(values, false);
4360
4361        for file in files {
4362            // check statistics are valid
4363            let reader = SerializedFileReader::new(file).unwrap();
4364            let metadata = reader.metadata();
4365
4366            let mut row_offset = 0;
4367            for row_group in metadata.row_groups() {
4368                assert_eq!(row_group.num_columns(), 1);
4369                let column = row_group.column(0);
4370
4371                let num_values = column.num_values() as usize;
4372                let src_slice = &src[row_offset..row_offset + num_values];
4373                row_offset += column.num_values() as usize;
4374
4375                let stats = column.statistics().unwrap();
4376                if let Statistics::Int32(stats) = stats {
4377                    assert_eq!(
4378                        *stats.min_opt().unwrap() as u32,
4379                        *src_slice.iter().min().unwrap()
4380                    );
4381                    assert_eq!(
4382                        *stats.max_opt().unwrap() as u32,
4383                        *src_slice.iter().max().unwrap()
4384                    );
4385                } else {
4386                    panic!("Statistics::Int32 missing")
4387                }
4388            }
4389        }
4390    }
4391
4392    #[test]
4393    fn u64_min_max() {
4394        // check values roundtrip through parquet
4395        let src = [
4396            u64::MIN,
4397            u64::MIN + 1,
4398            (i64::MAX as u64) - 1,
4399            i64::MAX as u64,
4400            (i64::MAX as u64) + 1,
4401            u64::MAX - 1,
4402            u64::MAX,
4403        ];
4404        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4405        let files = one_column_roundtrip(values, false);
4406
4407        for file in files {
4408            // check statistics are valid
4409            let reader = SerializedFileReader::new(file).unwrap();
4410            let metadata = reader.metadata();
4411
4412            let mut row_offset = 0;
4413            for row_group in metadata.row_groups() {
4414                assert_eq!(row_group.num_columns(), 1);
4415                let column = row_group.column(0);
4416
4417                let num_values = column.num_values() as usize;
4418                let src_slice = &src[row_offset..row_offset + num_values];
4419                row_offset += column.num_values() as usize;
4420
4421                let stats = column.statistics().unwrap();
4422                if let Statistics::Int64(stats) = stats {
4423                    assert_eq!(
4424                        *stats.min_opt().unwrap() as u64,
4425                        *src_slice.iter().min().unwrap()
4426                    );
4427                    assert_eq!(
4428                        *stats.max_opt().unwrap() as u64,
4429                        *src_slice.iter().max().unwrap()
4430                    );
4431                } else {
4432                    panic!("Statistics::Int64 missing")
4433                }
4434            }
4435        }
4436    }
4437
4438    #[test]
4439    fn statistics_null_counts_only_nulls() {
4440        // check that null-count statistics for "only NULL"-columns are correct
4441        let values = Arc::new(UInt64Array::from(vec![None, None]));
4442        let files = one_column_roundtrip(values, true);
4443
4444        for file in files {
4445            // check statistics are valid
4446            let reader = SerializedFileReader::new(file).unwrap();
4447            let metadata = reader.metadata();
4448            assert_eq!(metadata.num_row_groups(), 1);
4449            let row_group = metadata.row_group(0);
4450            assert_eq!(row_group.num_columns(), 1);
4451            let column = row_group.column(0);
4452            let stats = column.statistics().unwrap();
4453            assert_eq!(stats.null_count_opt(), Some(2));
4454        }
4455    }
4456
4457    #[test]
4458    fn test_list_of_struct_roundtrip() {
4459        // define schema
4460        let int_field = Field::new("a", DataType::Int32, true);
4461        let int_field2 = Field::new("b", DataType::Int32, true);
4462
4463        let int_builder = Int32Builder::with_capacity(10);
4464        let int_builder2 = Int32Builder::with_capacity(10);
4465
4466        let struct_builder = StructBuilder::new(
4467            vec![int_field, int_field2],
4468            vec![Box::new(int_builder), Box::new(int_builder2)],
4469        );
4470        let mut list_builder = ListBuilder::new(struct_builder);
4471
4472        // Construct the following array
4473        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
4474
4475        // [{a: 1, b: 2}]
4476        let values = list_builder.values();
4477        values
4478            .field_builder::<Int32Builder>(0)
4479            .unwrap()
4480            .append_value(1);
4481        values
4482            .field_builder::<Int32Builder>(1)
4483            .unwrap()
4484            .append_value(2);
4485        values.append(true);
4486        list_builder.append(true);
4487
4488        // []
4489        list_builder.append(true);
4490
4491        // null
4492        list_builder.append(false);
4493
4494        // [null, null]
4495        let values = list_builder.values();
4496        values
4497            .field_builder::<Int32Builder>(0)
4498            .unwrap()
4499            .append_null();
4500        values
4501            .field_builder::<Int32Builder>(1)
4502            .unwrap()
4503            .append_null();
4504        values.append(false);
4505        values
4506            .field_builder::<Int32Builder>(0)
4507            .unwrap()
4508            .append_null();
4509        values
4510            .field_builder::<Int32Builder>(1)
4511            .unwrap()
4512            .append_null();
4513        values.append(false);
4514        list_builder.append(true);
4515
4516        // [{a: null, b: 3}]
4517        let values = list_builder.values();
4518        values
4519            .field_builder::<Int32Builder>(0)
4520            .unwrap()
4521            .append_null();
4522        values
4523            .field_builder::<Int32Builder>(1)
4524            .unwrap()
4525            .append_value(3);
4526        values.append(true);
4527        list_builder.append(true);
4528
4529        // [{a: 2, b: null}]
4530        let values = list_builder.values();
4531        values
4532            .field_builder::<Int32Builder>(0)
4533            .unwrap()
4534            .append_value(2);
4535        values
4536            .field_builder::<Int32Builder>(1)
4537            .unwrap()
4538            .append_null();
4539        values.append(true);
4540        list_builder.append(true);
4541
4542        let array = Arc::new(list_builder.finish());
4543
4544        one_column_roundtrip(array, true);
4545    }
4546
4547    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4548        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4549    }
4550
4551    #[test]
4552    fn test_aggregates_records() {
4553        let arrays = [
4554            Int32Array::from((0..100).collect::<Vec<_>>()),
4555            Int32Array::from((0..50).collect::<Vec<_>>()),
4556            Int32Array::from((200..500).collect::<Vec<_>>()),
4557        ];
4558
4559        let schema = Arc::new(Schema::new(vec![Field::new(
4560            "int",
4561            ArrowDataType::Int32,
4562            false,
4563        )]));
4564
4565        let file = tempfile::tempfile().unwrap();
4566
4567        let props = WriterProperties::builder()
4568            .set_max_row_group_row_count(Some(200))
4569            .build();
4570
4571        let mut writer =
4572            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4573
4574        for array in arrays {
4575            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4576            writer.write(&batch).unwrap();
4577        }
4578
4579        writer.close().unwrap();
4580
4581        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4582        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4583
4584        let batches = builder
4585            .with_batch_size(100)
4586            .build()
4587            .unwrap()
4588            .collect::<ArrowResult<Vec<_>>>()
4589            .unwrap();
4590
4591        assert_eq!(batches.len(), 5);
4592        assert!(batches.iter().all(|x| x.num_columns() == 1));
4593
4594        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4595
4596        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4597
4598        let values: Vec<_> = batches
4599            .iter()
4600            .flat_map(|x| {
4601                x.column(0)
4602                    .as_any()
4603                    .downcast_ref::<Int32Array>()
4604                    .unwrap()
4605                    .values()
4606                    .iter()
4607                    .cloned()
4608            })
4609            .collect();
4610
4611        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4612        assert_eq!(&values, &expected_values)
4613    }
4614
4615    #[test]
4616    fn complex_aggregate() {
4617        // Tests aggregating nested data
4618        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4619        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4620        let struct_a = Arc::new(Field::new(
4621            "struct_a",
4622            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4623            true,
4624        ));
4625
4626        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4627        let struct_b = Arc::new(Field::new(
4628            "struct_b",
4629            DataType::Struct(vec![list_a.clone()].into()),
4630            false,
4631        ));
4632
4633        let schema = Arc::new(Schema::new(vec![struct_b]));
4634
4635        // create nested data
4636        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4637        let field_b_array =
4638            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4639
4640        let struct_a_array = StructArray::from(vec![
4641            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4642            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4643        ]);
4644
4645        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4646            .len(5)
4647            .add_buffer(Buffer::from_iter(vec![
4648                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4649            ]))
4650            .null_bit_buffer(Some(Buffer::from_iter(vec![
4651                true, false, true, false, true,
4652            ])))
4653            .child_data(vec![struct_a_array.into_data()])
4654            .build()
4655            .unwrap();
4656
4657        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4658        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4659
4660        let batch1 =
4661            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4662                .unwrap();
4663
4664        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4665        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4666
4667        let struct_a_array = StructArray::from(vec![
4668            (field_a, Arc::new(field_a_array) as ArrayRef),
4669            (field_b, Arc::new(field_b_array) as ArrayRef),
4670        ]);
4671
4672        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4673            .len(2)
4674            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4675            .child_data(vec![struct_a_array.into_data()])
4676            .build()
4677            .unwrap();
4678
4679        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4680        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4681
4682        let batch2 =
4683            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4684                .unwrap();
4685
4686        let batches = &[batch1, batch2];
4687
4688        // Verify data is as expected
4689
4690        let expected = r#"
4691            +-------------------------------------------------------------------------------------------------------+
4692            | struct_b                                                                                              |
4693            +-------------------------------------------------------------------------------------------------------+
4694            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
4695            | {list: }                                                                                              |
4696            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
4697            | {list: }                                                                                              |
4698            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
4699            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4700            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
4701            +-------------------------------------------------------------------------------------------------------+
4702        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4703
4704        let actual = pretty_format_batches(batches).unwrap().to_string();
4705        assert_eq!(actual, expected);
4706
4707        // Write data
4708        let file = tempfile::tempfile().unwrap();
4709        let props = WriterProperties::builder()
4710            .set_max_row_group_row_count(Some(6))
4711            .build();
4712
4713        let mut writer =
4714            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4715
4716        for batch in batches {
4717            writer.write(batch).unwrap();
4718        }
4719        writer.close().unwrap();
4720
4721        // Read Data
4722        // Should have written entire first batch and first row of second to the first row group
4723        // leaving a single row in the second row group
4724
4725        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4726        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4727
4728        let batches = builder
4729            .with_batch_size(2)
4730            .build()
4731            .unwrap()
4732            .collect::<ArrowResult<Vec<_>>>()
4733            .unwrap();
4734
4735        assert_eq!(batches.len(), 4);
4736        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4737        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4738
4739        let actual = pretty_format_batches(&batches).unwrap().to_string();
4740        assert_eq!(actual, expected);
4741    }
4742
4743    #[test]
4744    fn test_arrow_writer_metadata() {
4745        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4746        let file_schema = batch_schema.clone().with_metadata(
4747            vec![("foo".to_string(), "bar".to_string())]
4748                .into_iter()
4749                .collect(),
4750        );
4751
4752        let batch = RecordBatch::try_new(
4753            Arc::new(batch_schema),
4754            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4755        )
4756        .unwrap();
4757
4758        let mut buf = Vec::with_capacity(1024);
4759        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4760        writer.write(&batch).unwrap();
4761        writer.close().unwrap();
4762    }
4763
4764    #[test]
4765    fn test_arrow_writer_nullable() {
4766        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4767        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4768        let file_schema = Arc::new(file_schema);
4769
4770        let batch = RecordBatch::try_new(
4771            Arc::new(batch_schema),
4772            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4773        )
4774        .unwrap();
4775
4776        let mut buf = Vec::with_capacity(1024);
4777        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4778        writer.write(&batch).unwrap();
4779        writer.close().unwrap();
4780
4781        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4782        let back = read.next().unwrap().unwrap();
4783        assert_eq!(back.schema(), file_schema);
4784        assert_ne!(back.schema(), batch.schema());
4785        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4786    }
4787
4788    #[test]
4789    fn in_progress_accounting() {
4790        // define schema
4791        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4792
4793        // create some data
4794        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4795
4796        // build a record batch
4797        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4798
4799        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4800
4801        // starts empty
4802        assert_eq!(writer.in_progress_size(), 0);
4803        assert_eq!(writer.in_progress_rows(), 0);
4804        assert_eq!(writer.memory_size(), 0);
4805        assert_eq!(writer.bytes_written(), 4); // Initial header
4806        writer.write(&batch).unwrap();
4807
4808        // updated on write
4809        let initial_size = writer.in_progress_size();
4810        assert!(initial_size > 0);
4811        assert_eq!(writer.in_progress_rows(), 5);
4812        let initial_memory = writer.memory_size();
4813        assert!(initial_memory > 0);
4814        // memory estimate is larger than estimated encoded size
4815        assert!(
4816            initial_size <= initial_memory,
4817            "{initial_size} <= {initial_memory}"
4818        );
4819
4820        // updated on second write
4821        writer.write(&batch).unwrap();
4822        assert!(writer.in_progress_size() > initial_size);
4823        assert_eq!(writer.in_progress_rows(), 10);
4824        assert!(writer.memory_size() > initial_memory);
4825        assert!(
4826            writer.in_progress_size() <= writer.memory_size(),
4827            "in_progress_size {} <= memory_size {}",
4828            writer.in_progress_size(),
4829            writer.memory_size()
4830        );
4831
4832        // in progress tracking is cleared, but the overall data written is updated
4833        let pre_flush_bytes_written = writer.bytes_written();
4834        writer.flush().unwrap();
4835        assert_eq!(writer.in_progress_size(), 0);
4836        assert_eq!(writer.memory_size(), 0);
4837        assert!(writer.bytes_written() > pre_flush_bytes_written);
4838
4839        writer.close().unwrap();
4840    }
4841
4842    #[test]
4843    fn test_writer_all_null() {
4844        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4845        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4846        let batch = RecordBatch::try_from_iter(vec![
4847            ("a", Arc::new(a) as ArrayRef),
4848            ("b", Arc::new(b) as ArrayRef),
4849        ])
4850        .unwrap();
4851
4852        let mut buf = Vec::with_capacity(1024);
4853        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4854        writer.write(&batch).unwrap();
4855        writer.close().unwrap();
4856
4857        let bytes = Bytes::from(buf);
4858        let options = ReadOptionsBuilder::new().with_page_index().build();
4859        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4860        let index = reader.metadata().offset_index().unwrap();
4861
4862        assert_eq!(index.len(), 1);
4863        assert_eq!(index[0].len(), 2); // 2 columns
4864        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4865        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4866    }
4867
4868    #[test]
4869    fn test_disabled_statistics_with_page() {
4870        let file_schema = Schema::new(vec![
4871            Field::new("a", DataType::Utf8, true),
4872            Field::new("b", DataType::Utf8, true),
4873        ]);
4874        let file_schema = Arc::new(file_schema);
4875
4876        let batch = RecordBatch::try_new(
4877            file_schema.clone(),
4878            vec![
4879                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4880                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4881            ],
4882        )
4883        .unwrap();
4884
4885        let props = WriterProperties::builder()
4886            .set_statistics_enabled(EnabledStatistics::None)
4887            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4888            .build();
4889
4890        let mut buf = Vec::with_capacity(1024);
4891        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4892        writer.write(&batch).unwrap();
4893
4894        let metadata = writer.close().unwrap();
4895        assert_eq!(metadata.num_row_groups(), 1);
4896        let row_group = metadata.row_group(0);
4897        assert_eq!(row_group.num_columns(), 2);
4898        // Column "a" has both offset and column index, as requested
4899        assert!(row_group.column(0).offset_index_offset().is_some());
4900        assert!(row_group.column(0).column_index_offset().is_some());
4901        // Column "b" should only have offset index
4902        assert!(row_group.column(1).offset_index_offset().is_some());
4903        assert!(row_group.column(1).column_index_offset().is_none());
4904
4905        let options = ReadOptionsBuilder::new().with_page_index().build();
4906        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4907
4908        let row_group = reader.get_row_group(0).unwrap();
4909        let a_col = row_group.metadata().column(0);
4910        let b_col = row_group.metadata().column(1);
4911
4912        // Column chunk of column "a" should have chunk level statistics
4913        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4914            let min = byte_array_stats.min_opt().unwrap();
4915            let max = byte_array_stats.max_opt().unwrap();
4916
4917            assert_eq!(min.as_bytes(), b"a");
4918            assert_eq!(max.as_bytes(), b"d");
4919        } else {
4920            panic!("expecting Statistics::ByteArray");
4921        }
4922
4923        // The column chunk for column "b" shouldn't have statistics
4924        assert!(b_col.statistics().is_none());
4925
4926        let offset_index = reader.metadata().offset_index().unwrap();
4927        assert_eq!(offset_index.len(), 1); // 1 row group
4928        assert_eq!(offset_index[0].len(), 2); // 2 columns
4929
4930        let column_index = reader.metadata().column_index().unwrap();
4931        assert_eq!(column_index.len(), 1); // 1 row group
4932        assert_eq!(column_index[0].len(), 2); // 2 columns
4933
4934        let a_idx = &column_index[0][0];
4935        assert!(
4936            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4937            "{a_idx:?}"
4938        );
4939        let b_idx = &column_index[0][1];
4940        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4941    }
4942
4943    #[test]
4944    fn test_disabled_statistics_with_chunk() {
4945        let file_schema = Schema::new(vec![
4946            Field::new("a", DataType::Utf8, true),
4947            Field::new("b", DataType::Utf8, true),
4948        ]);
4949        let file_schema = Arc::new(file_schema);
4950
4951        let batch = RecordBatch::try_new(
4952            file_schema.clone(),
4953            vec![
4954                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4955                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4956            ],
4957        )
4958        .unwrap();
4959
4960        let props = WriterProperties::builder()
4961            .set_statistics_enabled(EnabledStatistics::None)
4962            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4963            .build();
4964
4965        let mut buf = Vec::with_capacity(1024);
4966        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4967        writer.write(&batch).unwrap();
4968
4969        let metadata = writer.close().unwrap();
4970        assert_eq!(metadata.num_row_groups(), 1);
4971        let row_group = metadata.row_group(0);
4972        assert_eq!(row_group.num_columns(), 2);
4973        // Column "a" should only have offset index
4974        assert!(row_group.column(0).offset_index_offset().is_some());
4975        assert!(row_group.column(0).column_index_offset().is_none());
4976        // Column "b" should only have offset index
4977        assert!(row_group.column(1).offset_index_offset().is_some());
4978        assert!(row_group.column(1).column_index_offset().is_none());
4979
4980        let options = ReadOptionsBuilder::new().with_page_index().build();
4981        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4982
4983        let row_group = reader.get_row_group(0).unwrap();
4984        let a_col = row_group.metadata().column(0);
4985        let b_col = row_group.metadata().column(1);
4986
4987        // Column chunk of column "a" should have chunk level statistics
4988        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4989            let min = byte_array_stats.min_opt().unwrap();
4990            let max = byte_array_stats.max_opt().unwrap();
4991
4992            assert_eq!(min.as_bytes(), b"a");
4993            assert_eq!(max.as_bytes(), b"d");
4994        } else {
4995            panic!("expecting Statistics::ByteArray");
4996        }
4997
4998        // The column chunk for column "b"  shouldn't have statistics
4999        assert!(b_col.statistics().is_none());
5000
5001        let column_index = reader.metadata().column_index().unwrap();
5002        assert_eq!(column_index.len(), 1); // 1 row group
5003        assert_eq!(column_index[0].len(), 2); // 2 columns
5004
5005        let a_idx = &column_index[0][0];
5006        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
5007        let b_idx = &column_index[0][1];
5008        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
5009    }
5010
5011    #[test]
5012    fn test_arrow_writer_skip_metadata() {
5013        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5014        let file_schema = Arc::new(batch_schema.clone());
5015
5016        let batch = RecordBatch::try_new(
5017            Arc::new(batch_schema),
5018            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5019        )
5020        .unwrap();
5021        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
5022
5023        let mut buf = Vec::with_capacity(1024);
5024        let mut writer =
5025            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5026        writer.write(&batch).unwrap();
5027        writer.close().unwrap();
5028
5029        let bytes = Bytes::from(buf);
5030        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5031        assert_eq!(file_schema, *reader_builder.schema());
5032        if let Some(key_value_metadata) = reader_builder
5033            .metadata()
5034            .file_metadata()
5035            .key_value_metadata()
5036        {
5037            assert!(
5038                !key_value_metadata
5039                    .iter()
5040                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
5041            );
5042        }
5043    }
5044
5045    #[test]
5046    fn test_arrow_writer_skip_path_in_schema() {
5047        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5048        let file_schema = Arc::new(batch_schema.clone());
5049
5050        let batch = RecordBatch::try_new(
5051            Arc::new(batch_schema),
5052            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5053        )
5054        .unwrap();
5055
5056        // default options should still write path_in_schema
5057        let skip_options = ArrowWriterOptions::new();
5058
5059        let mut buf = Vec::with_capacity(1024);
5060        let mut writer =
5061            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5062        writer.write(&batch).unwrap();
5063        writer.close().unwrap();
5064
5065        // override to not write path_in_schema
5066        let skip_options = ArrowWriterOptions::new().with_properties(
5067            WriterProperties::builder()
5068                .set_write_path_in_schema(false)
5069                .build(),
5070        );
5071
5072        let mut buf2 = Vec::with_capacity(1024);
5073        let mut writer =
5074            ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
5075                .unwrap();
5076        writer.write(&batch).unwrap();
5077        writer.close().unwrap();
5078
5079        // buf2 should be a bit smaller due to lack of path_in_schema
5080        assert!(buf.len() > buf2.len());
5081    }
5082
5083    #[test]
5084    fn mismatched_schemas() {
5085        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
5086        let file_schema = Arc::new(Schema::new(vec![Field::new(
5087            "temperature",
5088            DataType::Float64,
5089            false,
5090        )]));
5091
5092        let batch = RecordBatch::try_new(
5093            Arc::new(batch_schema),
5094            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5095        )
5096        .unwrap();
5097
5098        let mut buf = Vec::with_capacity(1024);
5099        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5100
5101        let err = writer.write(&batch).unwrap_err().to_string();
5102        assert_eq!(
5103            err,
5104            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5105        );
5106    }
5107
5108    #[test]
5109    // https://github.com/apache/arrow-rs/issues/6988
5110    fn test_roundtrip_empty_schema() {
5111        // create empty record batch with empty schema
5112        let empty_batch = RecordBatch::try_new_with_options(
5113            Arc::new(Schema::empty()),
5114            vec![],
5115            &RecordBatchOptions::default().with_row_count(Some(0)),
5116        )
5117        .unwrap();
5118
5119        // write to parquet
5120        let mut parquet_bytes: Vec<u8> = Vec::new();
5121        let mut writer =
5122            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5123        writer.write(&empty_batch).unwrap();
5124        writer.close().unwrap();
5125
5126        // read from parquet
5127        let bytes = Bytes::from(parquet_bytes);
5128        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5129        assert_eq!(reader.schema(), &empty_batch.schema());
5130        let batches: Vec<_> = reader
5131            .build()
5132            .unwrap()
5133            .collect::<ArrowResult<Vec<_>>>()
5134            .unwrap();
5135        assert_eq!(batches.len(), 0);
5136    }
5137
5138    #[test]
5139    fn test_page_stats_not_written_by_default() {
5140        let string_field = Field::new("a", DataType::Utf8, false);
5141        let schema = Schema::new(vec![string_field]);
5142        let raw_string_values = vec!["Blart Versenwald III"];
5143        let string_values = StringArray::from(raw_string_values.clone());
5144        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5145
5146        let props = WriterProperties::builder()
5147            .set_statistics_enabled(EnabledStatistics::Page)
5148            .set_dictionary_enabled(false)
5149            .set_encoding(Encoding::PLAIN)
5150            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5151            .build();
5152
5153        let file = roundtrip_opts(&batch, props);
5154
5155        // read file and decode page headers
5156        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5157
5158        // decode first page header
5159        let first_page = &file[4..];
5160        let mut prot = ThriftSliceInputProtocol::new(first_page);
5161        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5162        let stats = hdr.data_page_header.unwrap().statistics;
5163
5164        assert!(stats.is_none());
5165    }
5166
5167    #[test]
5168    fn test_page_stats_when_enabled() {
5169        let string_field = Field::new("a", DataType::Utf8, false);
5170        let schema = Schema::new(vec![string_field]);
5171        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5172        let string_values = StringArray::from(raw_string_values.clone());
5173        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5174
5175        let props = WriterProperties::builder()
5176            .set_statistics_enabled(EnabledStatistics::Page)
5177            .set_dictionary_enabled(false)
5178            .set_encoding(Encoding::PLAIN)
5179            .set_write_page_header_statistics(true)
5180            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5181            .build();
5182
5183        let file = roundtrip_opts(&batch, props);
5184
5185        // read file and decode page headers
5186        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5187
5188        // decode first page header
5189        let first_page = &file[4..];
5190        let mut prot = ThriftSliceInputProtocol::new(first_page);
5191        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5192        let stats = hdr.data_page_header.unwrap().statistics;
5193
5194        let stats = stats.unwrap();
5195        // check that min/max were actually written to the page
5196        assert!(stats.is_max_value_exact.unwrap());
5197        assert!(stats.is_min_value_exact.unwrap());
5198        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5199        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5200    }
5201
5202    #[test]
5203    fn test_page_stats_truncation() {
5204        let string_field = Field::new("a", DataType::Utf8, false);
5205        let binary_field = Field::new("b", DataType::Binary, false);
5206        let schema = Schema::new(vec![string_field, binary_field]);
5207
5208        let raw_string_values = vec!["Blart Versenwald III"];
5209        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5210        let raw_binary_value_refs = raw_binary_values
5211            .iter()
5212            .map(|x| x.as_slice())
5213            .collect::<Vec<_>>();
5214
5215        let string_values = StringArray::from(raw_string_values.clone());
5216        let binary_values = BinaryArray::from(raw_binary_value_refs);
5217        let batch = RecordBatch::try_new(
5218            Arc::new(schema),
5219            vec![Arc::new(string_values), Arc::new(binary_values)],
5220        )
5221        .unwrap();
5222
5223        let props = WriterProperties::builder()
5224            .set_statistics_truncate_length(Some(2))
5225            .set_dictionary_enabled(false)
5226            .set_encoding(Encoding::PLAIN)
5227            .set_write_page_header_statistics(true)
5228            .set_compression(crate::basic::Compression::UNCOMPRESSED)
5229            .build();
5230
5231        let file = roundtrip_opts(&batch, props);
5232
5233        // read file and decode page headers
5234        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
5235
5236        // decode first page header
5237        let first_page = &file[4..];
5238        let mut prot = ThriftSliceInputProtocol::new(first_page);
5239        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5240        let stats = hdr.data_page_header.unwrap().statistics;
5241        assert!(stats.is_some());
5242        let stats = stats.unwrap();
5243        // check that min/max were properly truncated
5244        assert!(!stats.is_max_value_exact.unwrap());
5245        assert!(!stats.is_min_value_exact.unwrap());
5246        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5247        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5248
5249        // check second page now
5250        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5251        let mut prot = ThriftSliceInputProtocol::new(second_page);
5252        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5253        let stats = hdr.data_page_header.unwrap().statistics;
5254        assert!(stats.is_some());
5255        let stats = stats.unwrap();
5256        // check that min/max were properly truncated
5257        assert!(!stats.is_max_value_exact.unwrap());
5258        assert!(!stats.is_min_value_exact.unwrap());
5259        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5260        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5261    }
5262
5263    #[test]
5264    fn test_page_encoding_statistics_roundtrip() {
5265        let batch_schema = Schema::new(vec![Field::new(
5266            "int32",
5267            arrow_schema::DataType::Int32,
5268            false,
5269        )]);
5270
5271        let batch = RecordBatch::try_new(
5272            Arc::new(batch_schema.clone()),
5273            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5274        )
5275        .unwrap();
5276
5277        let mut file: File = tempfile::tempfile().unwrap();
5278        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5279        writer.write(&batch).unwrap();
5280        let file_metadata = writer.close().unwrap();
5281
5282        assert_eq!(file_metadata.num_row_groups(), 1);
5283        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5284        assert!(
5285            file_metadata
5286                .row_group(0)
5287                .column(0)
5288                .page_encoding_stats()
5289                .is_some()
5290        );
5291        let chunk_page_stats = file_metadata
5292            .row_group(0)
5293            .column(0)
5294            .page_encoding_stats()
5295            .unwrap();
5296
5297        // check that the read metadata is also correct
5298        let options = ReadOptionsBuilder::new()
5299            .with_page_index()
5300            .with_encoding_stats_as_mask(false)
5301            .build();
5302        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5303
5304        let rowgroup = reader.get_row_group(0).expect("row group missing");
5305        assert_eq!(rowgroup.num_columns(), 1);
5306        let column = rowgroup.metadata().column(0);
5307        assert!(column.page_encoding_stats().is_some());
5308        let file_page_stats = column.page_encoding_stats().unwrap();
5309        assert_eq!(chunk_page_stats, file_page_stats);
5310    }
5311
5312    #[test]
5313    fn test_different_dict_page_size_limit() {
5314        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5315        let schema = Arc::new(Schema::new(vec![
5316            Field::new("col0", arrow_schema::DataType::Int64, false),
5317            Field::new("col1", arrow_schema::DataType::Int64, false),
5318        ]));
5319        let batch =
5320            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5321
5322        let props = WriterProperties::builder()
5323            .set_dictionary_page_size_limit(1024 * 1024)
5324            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5325            .build();
5326        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5327        writer.write(&batch).unwrap();
5328        let data = Bytes::from(writer.into_inner().unwrap());
5329
5330        let mut metadata = ParquetMetaDataReader::new();
5331        metadata.try_parse(&data).unwrap();
5332        let metadata = metadata.finish().unwrap();
5333        let col0_meta = metadata.row_group(0).column(0);
5334        let col1_meta = metadata.row_group(0).column(1);
5335
5336        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5337            let mut reader =
5338                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5339            let page = reader.get_next_page().unwrap().unwrap();
5340            match page {
5341                Page::DictionaryPage { buf, .. } => buf.len(),
5342                _ => panic!("expected DictionaryPage"),
5343            }
5344        };
5345
5346        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5347        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5348    }
5349
5350    #[test]
5351    fn test_arrow_writer_granular_mode_roundtrip() {
5352        // Granular mode subdivides chunks and writes more pages than the
5353        // default batched path. Make sure the data we write back is
5354        // bit-identical to what went in — page-count assertions elsewhere
5355        // only prove pages were cut, not that the encoded data is correct.
5356        //
5357        // Mix value sizes so that the cumulative-byte-budget cutoff
5358        // lands mid-chunk, exercising both batched and granular paths
5359        // within the same `write_batch_internal` call.
5360        let small = "tiny".to_string();
5361        let big = "x".repeat(64 * 1024);
5362        let strings: Vec<String> = (0..256)
5363            .map(|i| {
5364                if i % 16 == 0 {
5365                    big.clone()
5366                } else {
5367                    small.clone()
5368                }
5369            })
5370            .collect();
5371
5372        let schema = Arc::new(Schema::new(vec![Field::new(
5373            "col",
5374            ArrowDataType::Utf8,
5375            false,
5376        )]));
5377        let batch = RecordBatch::try_new(
5378            schema.clone(),
5379            vec![Arc::new(StringArray::from(strings.clone())) as _],
5380        )
5381        .unwrap();
5382
5383        let props = WriterProperties::builder()
5384            .set_dictionary_enabled(false)
5385            .set_data_page_size_limit(16 * 1024)
5386            .build();
5387        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5388        writer.write(&batch).unwrap();
5389        let data = Bytes::from(writer.into_inner().unwrap());
5390
5391        let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5392        let read = reader.next().unwrap().unwrap();
5393        assert!(reader.next().is_none(), "expected one batch");
5394        let col = read
5395            .column(0)
5396            .as_any()
5397            .downcast_ref::<StringArray>()
5398            .unwrap();
5399        assert_eq!(col.len(), strings.len());
5400        for (i, expected) in strings.iter().enumerate() {
5401            assert_eq!(
5402                col.value(i),
5403                expected.as_str(),
5404                "value mismatch at index {i}"
5405            );
5406        }
5407    }
5408
5409    #[test]
5410    fn test_arrow_writer_all_null_string_column() {
5411        // The `LevelDataRef::value_count` Uniform branch with
5412        // `value != max_def` (entirely-null chunk) must return 0 so the
5413        // sub-batch sizer short-circuits to batch mode without trying
5414        // to estimate byte budgets for non-existent values.
5415        let num_rows = 1024;
5416        let schema = Arc::new(Schema::new(vec![Field::new(
5417            "col",
5418            ArrowDataType::Utf8,
5419            true,
5420        )]));
5421        let nulls: Vec<Option<&str>> = vec![None; num_rows];
5422        let batch = RecordBatch::try_new(
5423            schema.clone(),
5424            vec![Arc::new(StringArray::from(nulls)) as _],
5425        )
5426        .unwrap();
5427
5428        let props = WriterProperties::builder()
5429            .set_dictionary_enabled(false)
5430            .set_data_page_size_limit(16 * 1024)
5431            .build();
5432        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5433        writer.write(&batch).unwrap();
5434        let data = Bytes::from(writer.into_inner().unwrap());
5435
5436        // Re-parse the file: row group has one column, every row is
5437        // null, all data pages report `num_rows / page_count` rows.
5438        let mut metadata = ParquetMetaDataReader::new();
5439        metadata.try_parse(&data).unwrap();
5440        let metadata = metadata.finish().unwrap();
5441        let row_group = metadata.row_group(0);
5442        let col_meta = row_group.column(0);
5443        assert_eq!(row_group.num_rows() as usize, num_rows);
5444        // Statistics record `null_count = num_rows` — proves every value
5445        // was written as null.
5446        if let Some(stats) = col_meta.statistics() {
5447            assert_eq!(
5448                stats.null_count_opt().unwrap_or(0) as usize,
5449                num_rows,
5450                "expected all-null column to report null_count = num_rows"
5451            );
5452        }
5453
5454        let mut reader =
5455            SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5456        let mut total_values = 0u32;
5457        while let Some(page) = reader.get_next_page().unwrap() {
5458            if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5459                total_values += page.num_values();
5460            }
5461        }
5462        assert_eq!(
5463            total_values as usize, num_rows,
5464            "expected every level position to be represented in some page"
5465        );
5466    }
5467
5468    struct WriteBatchesShape {
5469        num_batches: usize,
5470        rows_per_batch: usize,
5471        row_size: usize,
5472    }
5473
5474    /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter`
5475    fn write_batches(
5476        WriteBatchesShape {
5477            num_batches,
5478            rows_per_batch,
5479            row_size,
5480        }: WriteBatchesShape,
5481        props: WriterProperties,
5482    ) -> ParquetRecordBatchReaderBuilder<File> {
5483        let schema = Arc::new(Schema::new(vec![Field::new(
5484            "str",
5485            ArrowDataType::Utf8,
5486            false,
5487        )]));
5488        let file = tempfile::tempfile().unwrap();
5489        let mut writer =
5490            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5491
5492        for batch_idx in 0..num_batches {
5493            let strings: Vec<String> = (0..rows_per_batch)
5494                .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5495                .collect();
5496            let array = StringArray::from(strings);
5497            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5498            writer.write(&batch).unwrap();
5499        }
5500        writer.close().unwrap();
5501        ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5502    }
5503
5504    #[test]
5505    // When both limits are None, all data should go into a single row group
5506    fn test_row_group_limit_none_writes_single_row_group() {
5507        let props = WriterProperties::builder()
5508            .set_max_row_group_row_count(None)
5509            .set_max_row_group_bytes(None)
5510            .build();
5511
5512        let builder = write_batches(
5513            WriteBatchesShape {
5514                num_batches: 1,
5515                rows_per_batch: 1000,
5516                row_size: 4,
5517            },
5518            props,
5519        );
5520
5521        assert_eq!(
5522            &row_group_sizes(builder.metadata()),
5523            &[1000],
5524            "With no limits, all rows should be in a single row group"
5525        );
5526    }
5527
5528    #[test]
5529    // When only max_row_group_size is set, respect the row limit
5530    fn test_row_group_limit_rows_only() {
5531        let props = WriterProperties::builder()
5532            .set_max_row_group_row_count(Some(300))
5533            .set_max_row_group_bytes(None)
5534            .build();
5535
5536        let builder = write_batches(
5537            WriteBatchesShape {
5538                num_batches: 1,
5539                rows_per_batch: 1000,
5540                row_size: 4,
5541            },
5542            props,
5543        );
5544
5545        assert_eq!(
5546            &row_group_sizes(builder.metadata()),
5547            &[300, 300, 300, 100],
5548            "Row groups should be split by row count"
5549        );
5550    }
5551
5552    #[test]
5553    // When only max_row_group_bytes is set, respect the byte limit
5554    fn test_row_group_limit_bytes_only() {
5555        let props = WriterProperties::builder()
5556            .set_max_row_group_row_count(None)
5557            // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
5558            .set_max_row_group_bytes(Some(3500))
5559            .build();
5560
5561        let builder = write_batches(
5562            WriteBatchesShape {
5563                num_batches: 10,
5564                rows_per_batch: 10,
5565                row_size: 100,
5566            },
5567            props,
5568        );
5569
5570        let sizes = row_group_sizes(builder.metadata());
5571
5572        assert!(
5573            sizes.len() > 1,
5574            "Should have multiple row groups due to byte limit, got {sizes:?}",
5575        );
5576
5577        let total_rows: i64 = sizes.iter().sum();
5578        assert_eq!(total_rows, 100, "Total rows should be preserved");
5579    }
5580
5581    #[test]
5582    // If an in-progress row group is already oversized, it should be flushed before writing more.
5583    fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5584        let schema = Arc::new(Schema::new(vec![Field::new(
5585            "str",
5586            ArrowDataType::Utf8,
5587            false,
5588        )]));
5589        let file = tempfile::tempfile().unwrap();
5590
5591        // Start with no byte limit so we can intentionally build an oversized in-progress row group.
5592        let props = WriterProperties::builder()
5593            .set_max_row_group_row_count(None)
5594            .set_max_row_group_bytes(None)
5595            .build();
5596        let mut writer =
5597            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5598
5599        let first_array = StringArray::from(
5600            (0..10)
5601                .map(|i| format!("{:0>100}", i))
5602                .collect::<Vec<String>>(),
5603        );
5604        let first_batch =
5605            RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5606        writer.write(&first_batch).unwrap();
5607        assert_eq!(writer.in_progress_rows(), 10);
5608
5609        // Tighten the limit below the current in-progress bytes to exercise:
5610        // `if current_bytes >= max_bytes { self.flush()?; ... }`
5611        writer.max_row_group_bytes = Some(1);
5612
5613        let second_array = StringArray::from(vec!["x".to_string()]);
5614        let second_batch =
5615            RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5616        writer.write(&second_batch).unwrap();
5617        writer.close().unwrap();
5618        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5619
5620        assert_eq!(
5621            &row_group_sizes(builder.metadata()),
5622            &[10, 1],
5623            "The second write should flush an oversized in-progress row group first",
5624        );
5625    }
5626
5627    #[test]
5628    // When both limits are set, the row limit triggers first
5629    fn test_row_group_limit_both_row_wins_single_batch() {
5630        let props = WriterProperties::builder()
5631            .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows
5632            .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data
5633            .build();
5634
5635        let builder = write_batches(
5636            WriteBatchesShape {
5637                num_batches: 1,
5638                row_size: 4,
5639                rows_per_batch: 1000,
5640            },
5641            props,
5642        );
5643
5644        assert_eq!(
5645            &row_group_sizes(builder.metadata()),
5646            &[200, 200, 200, 200, 200],
5647            "Row limit should trigger before byte limit"
5648        );
5649    }
5650
5651    #[test]
5652    // When both limits are set, the row limit triggers first
5653    fn test_row_group_limit_both_row_wins_multiple_batches() {
5654        let props = WriterProperties::builder()
5655            .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows
5656            .set_max_row_group_bytes(Some(9999)) // Won't trigger
5657            .build();
5658
5659        let builder = write_batches(
5660            WriteBatchesShape {
5661                num_batches: 10,
5662                rows_per_batch: 10,
5663                row_size: 100,
5664            },
5665            props,
5666        );
5667
5668        assert_eq!(
5669            &row_group_sizes(builder.metadata()),
5670            &[5; 20],
5671            "Row limit should trigger before byte limit"
5672        );
5673    }
5674
5675    #[test]
5676    // When both limits are set, the byte limit triggers first
5677    fn test_row_group_limit_both_bytes_wins() {
5678        let props = WriterProperties::builder()
5679            .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows
5680            .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows
5681            .build();
5682
5683        let builder = write_batches(
5684            WriteBatchesShape {
5685                num_batches: 10,
5686                rows_per_batch: 10,
5687                row_size: 100,
5688            },
5689            props,
5690        );
5691
5692        let sizes = row_group_sizes(builder.metadata());
5693
5694        assert!(
5695            sizes.len() > 1,
5696            "Byte limit should trigger before row limit, got {sizes:?}",
5697        );
5698
5699        assert!(
5700            sizes.iter().all(|&s| s < 1000),
5701            "No row group should hit the row limit"
5702        );
5703
5704        let total_rows: i64 = sizes.iter().sum();
5705        assert_eq!(total_rows, 100, "Total rows should be preserved");
5706    }
5707
5708    #[test]
5709    fn arrow_column_chunk_close_mut_drops_column_index() {
5710        use crate::arrow::ArrowSchemaConverter;
5711        use crate::file::writer::SerializedFileWriter;
5712
5713        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5714        let props = Arc::new(
5715            WriterProperties::builder()
5716                .set_statistics_enabled(EnabledStatistics::Page)
5717                .build(),
5718        );
5719        let parquet_schema = ArrowSchemaConverter::new()
5720            .with_coerce_types(props.coerce_types())
5721            .convert(&schema)
5722            .unwrap();
5723
5724        let mut buf = Vec::with_capacity(1024);
5725        let mut writer =
5726            SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5727                .unwrap();
5728
5729        let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5730        let mut col_writers = factory.create_column_writers(0).unwrap();
5731        let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5732        for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5733            col_writers[0].write(&leaves).unwrap();
5734        }
5735        let mut chunk = col_writers.pop().unwrap().close().unwrap();
5736
5737        // Immutable accessor exposes the close result produced at close time.
5738        assert!(
5739            chunk.close().column_index.is_some(),
5740            "EnabledStatistics::Page should produce a column_index"
5741        );
5742
5743        // Mutable accessor lets callers drop the page-level index before append.
5744        chunk.close_mut().column_index = None;
5745        assert!(chunk.close().column_index.is_none());
5746
5747        let mut rg = writer.next_row_group().unwrap();
5748        chunk.append_to_row_group(&mut rg).unwrap();
5749        rg.close().unwrap();
5750        let file_meta = writer.close().unwrap();
5751
5752        // After dropping column_index, the resulting file records no column
5753        // index offset/length for this chunk.
5754        let cc = file_meta.row_group(0).column(0);
5755        assert!(cc.column_index_range().is_none());
5756    }
5757
5758    /// Writes a single-column RecordBatch to an in-memory Parquet buffer.
5759    fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5760        let schema = Arc::new(Schema::new(vec![Field::new(
5761            "col",
5762            array.data_type().clone(),
5763            true,
5764        )]));
5765        let buf = get_bytes_after_close(
5766            schema.clone(),
5767            &RecordBatch::try_new(schema, vec![array]).unwrap(),
5768        );
5769        Bytes::from(buf)
5770    }
5771
5772    /// Reads column 0 from a single-row-group Parquet buffer, projecting it with the given schema.
5773    /// Passing a flat schema when the buffer was written from a REE array lets callers decode
5774    /// the physical values without the run-end encoding wrapper.
5775    fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5776        let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5777        ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5778            .unwrap()
5779            .build()
5780            .unwrap()
5781            .next()
5782            .unwrap()
5783            .unwrap()
5784            .column(0)
5785            .clone()
5786    }
5787
5788    fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5789        let flat_schema = Arc::new(Schema::new(vec![Field::new(
5790            "col",
5791            flat.data_type().clone(),
5792            true,
5793        )]));
5794        let ree_bytes = write_column_to_bytes(ree);
5795        let flat_bytes = write_column_to_bytes(flat.clone());
5796        assert_eq!(
5797            ree_bytes, flat_bytes,
5798            "REE and flat bytes should be identical"
5799        );
5800
5801        let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5802        let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5803
5804        assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5805        assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5806    }
5807
5808    #[test]
5809    fn ree_string() {
5810        let ree: ArrayRef = Arc::new(
5811            [Some("a"), Some("a"), None, Some("b"), Some("b")]
5812                .into_iter()
5813                .collect::<Int32RunArray>(),
5814        );
5815        let flat: ArrayRef = Arc::new(StringArray::from(vec![
5816            Some("a"),
5817            Some("a"),
5818            None,
5819            Some("b"),
5820            Some("b"),
5821        ]));
5822        ree_write_read_roundtrip(ree, flat);
5823    }
5824
5825    #[test]
5826    fn ree_int32() {
5827        let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5828        for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5829            b.append_option(v);
5830        }
5831        let ree: ArrayRef = Arc::new(b.finish());
5832        let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5833            Some(1),
5834            Some(1),
5835            None,
5836            Some(2),
5837            Some(2),
5838        ]));
5839        ree_write_read_roundtrip(ree, flat);
5840    }
5841
5842    #[test]
5843    fn ree_bool() {
5844        // run_ends [3, 5, 7] → [T,T,T, null,null, F,F]
5845        let ree: ArrayRef = Arc::new(
5846            RunArray::try_new(
5847                &Int32Array::from(vec![3, 5, 7]),
5848                &BooleanArray::from(vec![Some(true), None, Some(false)]),
5849            )
5850            .unwrap(),
5851        );
5852        let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5853            Some(true),
5854            Some(true),
5855            Some(true),
5856            None,
5857            None,
5858            Some(false),
5859            Some(false),
5860        ]));
5861        ree_write_read_roundtrip(ree, flat);
5862    }
5863
5864    #[test]
5865    fn ree_fixed_size_binary() {
5866        let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5867            let mut b = FixedSizeBinaryBuilder::new(2);
5868            for v in vals {
5869                match v {
5870                    Some(x) => b.append_value(x).unwrap(),
5871                    None => b.append_null(),
5872                }
5873            }
5874            b.finish()
5875        };
5876        // run_ends [2, 4, 6] → [aa,aa, null,null, bb,bb]
5877        let ree: ArrayRef = Arc::new(
5878            RunArray::try_new(
5879                &Int32Array::from(vec![2, 4, 6]),
5880                &mk(&[Some(b"aa"), None, Some(b"bb")]),
5881            )
5882            .unwrap(),
5883        );
5884        let flat: ArrayRef = Arc::new(mk(&[
5885            Some(b"aa"),
5886            Some(b"aa"),
5887            None,
5888            None,
5889            Some(b"bb"),
5890            Some(b"bb"),
5891        ]));
5892        ree_write_read_roundtrip(ree, flat);
5893    }
5894
5895    #[test]
5896    fn ree_single_run() {
5897        let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5898        let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5899        ree_write_read_roundtrip(ree, flat);
5900    }
5901
5902    #[test]
5903    fn ree_float32() {
5904        // run_ends [2, 4, 5] → [1.0, 1.0, null, null, 2.5]
5905        let ree: ArrayRef = Arc::new(
5906            RunArray::try_new(
5907                &Int32Array::from(vec![2, 4, 5]),
5908                &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5909            )
5910            .unwrap(),
5911        );
5912        let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5913            Some(1.0_f32),
5914            Some(1.0_f32),
5915            None,
5916            None,
5917            Some(2.5_f32),
5918        ]));
5919        ree_write_read_roundtrip(ree, flat);
5920    }
5921
5922    #[test]
5923    fn ree_sliced() {
5924        // A sliced (non-zero offset) REE array: verify that get_physical_index
5925        // correctly accounts for the logical offset when expanding.
5926        // Full array: run_ends [3, 5, 7] → [a,a,a, b,b, c,c]
5927        // After slice(2, 5) the logical view is [a, b, b, c, c].
5928        let full: ArrayRef = Arc::new(
5929            RunArray::try_new(
5930                &Int32Array::from(vec![3, 5, 7]),
5931                &StringArray::from(vec!["a", "b", "c"]),
5932            )
5933            .unwrap(),
5934        );
5935        let sliced = full.slice(2, 5);
5936        let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5937        ree_write_read_roundtrip(sliced, flat);
5938    }
5939
5940    #[test]
5941    fn ree_struct_with_ree_child() {
5942        // Struct with a REE string field and a REE int field — confirms
5943        // recursion visits every child and each collapses to the right leaf type.
5944        let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5945
5946        let col_a: ArrayRef = Arc::new(
5947            RunArray::try_new(
5948                &run_ends,
5949                &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5950            )
5951            .unwrap(),
5952        );
5953        let col_b: ArrayRef = Arc::new(
5954            RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5955        );
5956
5957        let struct_array: ArrayRef = Arc::new(StructArray::new(
5958            Fields::from(vec![
5959                Field::new("a", col_a.data_type().clone(), true),
5960                Field::new("b", col_b.data_type().clone(), true),
5961            ]),
5962            vec![col_a, col_b],
5963            None,
5964        ));
5965
5966        let schema = Arc::new(Schema::new(vec![Field::new(
5967            "row",
5968            struct_array.data_type().clone(),
5969            true,
5970        )]));
5971        let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5972
5973        let mut buf = Vec::new();
5974        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
5975        writer.write(&batch).unwrap();
5976        let metadata = writer.close().unwrap();
5977
5978        let parquet_schema = metadata.file_metadata().schema_descr();
5979        assert_eq!(parquet_schema.num_columns(), 2);
5980        assert_eq!(
5981            parquet_schema.column(0).physical_type(),
5982            crate::basic::Type::BYTE_ARRAY
5983        );
5984        assert_eq!(parquet_schema.column(0).path().string(), "row.a");
5985        assert_eq!(
5986            parquet_schema.column(1).physical_type(),
5987            crate::basic::Type::INT32
5988        );
5989        assert_eq!(parquet_schema.column(1).path().string(), "row.b");
5990    }
5991}