Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::iter::Peekable;
25use std::slice::Iter;
26use std::sync::{Arc, Mutex};
27use std::vec::IntoIter;
28
29use arrow_array::cast::AsArray;
30use arrow_array::types::*;
31use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
32use arrow_schema::{
33    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
34};
35
36use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
37
38use crate::arrow::ArrowSchemaConverter;
39use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::reader::{ChunkReader, Length};
53use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
54use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
55use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
56use levels::{ArrayLevels, calculate_array_levels};
57
58mod byte_array;
59mod levels;
60
61/// Encodes [`RecordBatch`] to parquet
62///
63/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
64/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
65/// flushed on close, leading the final row group in the output file to potentially
66/// contain fewer than `max_row_group_size` rows
67///
68/// # Example: Writing `RecordBatch`es
69/// ```
70/// # use std::sync::Arc;
71/// # use bytes::Bytes;
72/// # use arrow_array::{ArrayRef, Int64Array};
73/// # use arrow_array::RecordBatch;
74/// # use parquet::arrow::arrow_writer::ArrowWriter;
75/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
76/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
77/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
78///
79/// let mut buffer = Vec::new();
80/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
81/// writer.write(&to_write).unwrap();
82/// writer.close().unwrap();
83///
84/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
85/// let read = reader.next().unwrap().unwrap();
86///
87/// assert_eq!(to_write, read);
88/// ```
89///
90/// # Memory Usage and Limiting
91///
92/// The nature of Parquet requires buffering of an entire row group before it can
93/// be flushed to the underlying writer. Data is mostly buffered in its encoded
94/// form, reducing memory usage. However, some data such as dictionary keys,
95/// large strings or very nested data may still result in non-trivial memory
96/// usage.
97///
98/// See Also:
99/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
100/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
101///
102/// Call [`Self::flush`] to trigger an early flush of a row group based on a
103/// memory threshold and/or global memory pressure. However,  smaller row groups
104/// result in higher metadata overheads, and thus may worsen compression ratios
105/// and query performance.
106///
107/// ```no_run
108/// # use std::io::Write;
109/// # use arrow_array::RecordBatch;
110/// # use parquet::arrow::ArrowWriter;
111/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
112/// # let batch: RecordBatch = todo!();
113/// writer.write(&batch).unwrap();
114/// // Trigger an early flush if anticipated size exceeds 1_000_000
115/// if writer.in_progress_size() > 1_000_000 {
116///     writer.flush().unwrap();
117/// }
118/// ```
119///
120/// ## Type Support
121///
122/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
123/// Parquet types including  [`StructArray`] and [`ListArray`].
124///
125/// The following are not supported:
126///
127/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
128///
129/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
130/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
131/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
132/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
133/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
134///
135/// ## Type Compatibility
136/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
137/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
138/// value type.
139///
140/// For example, the following [`DataType`]s are all logically equivalent and can be written
141/// to the same column:
142/// * String, LargeString, StringView
143/// * Binary, LargeBinary, BinaryView
144///
145/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
146/// contain compatible values.
147/// ```
148/// # use std::sync::Arc;
149/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
150/// # use arrow_schema::{DataType, Field, Schema};
151/// # use parquet::arrow::arrow_writer::ArrowWriter;
152/// let record_batch1 = RecordBatch::try_new(
153///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
154///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
155///  )
156/// .unwrap();
157///
158/// let mut buffer = Vec::new();
159/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
160/// writer.write(&record_batch1).unwrap();
161///
162/// let record_batch2 = RecordBatch::try_new(
163///     Arc::new(Schema::new(vec![Field::new(
164///         "col",
165///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
166///          false,
167///     )])),
168///     vec![Arc::new(DictionaryArray::new(
169///          UInt8Array::from_iter_values(vec![0, 1]),
170///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
171///      ))],
172///  )
173///  .unwrap();
174///  writer.write(&record_batch2).unwrap();
175///  writer.close();
176/// ```
177pub struct ArrowWriter<W: Write> {
178    /// Underlying Parquet writer
179    writer: SerializedFileWriter<W>,
180
181    /// The in-progress row group if any
182    in_progress: Option<ArrowRowGroupWriter>,
183
184    /// A copy of the Arrow schema.
185    ///
186    /// The schema is used to verify that each record batch written has the correct schema
187    arrow_schema: SchemaRef,
188
189    /// Creates new [`ArrowRowGroupWriter`] instances as required
190    row_group_writer_factory: ArrowRowGroupWriterFactory,
191
192    /// The maximum number of rows to write to each row group, or None for unlimited
193    max_row_group_row_count: Option<usize>,
194
195    /// The maximum size in bytes for a row group, or None for unlimited
196    max_row_group_bytes: Option<usize>,
197
198    /// CDC chunkers persisted across row groups (one per leaf column).
199    cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
200}
201
202impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        let buffered_memory = self.in_progress_size();
205        f.debug_struct("ArrowWriter")
206            .field("writer", &self.writer)
207            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
208            .field("in_progress_rows", &self.in_progress_rows())
209            .field("arrow_schema", &self.arrow_schema)
210            .field("max_row_group_row_count", &self.max_row_group_row_count)
211            .field("max_row_group_bytes", &self.max_row_group_bytes)
212            .finish()
213    }
214}
215
216impl<W: Write + Send> ArrowWriter<W> {
217    /// Try to create a new Arrow writer
218    ///
219    /// The writer will fail if:
220    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
221    ///  * the Arrow schema contains unsupported datatypes such as Unions
222    pub fn try_new(
223        writer: W,
224        arrow_schema: SchemaRef,
225        props: Option<WriterProperties>,
226    ) -> Result<Self> {
227        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
228        Self::try_new_with_options(writer, arrow_schema, options)
229    }
230
231    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
232    ///
233    /// The writer will fail if:
234    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
235    ///  * the Arrow schema contains unsupported datatypes such as Unions
236    pub fn try_new_with_options(
237        writer: W,
238        arrow_schema: SchemaRef,
239        options: ArrowWriterOptions,
240    ) -> Result<Self> {
241        let mut props = options.properties;
242
243        let schema = if let Some(parquet_schema) = options.schema_descr {
244            parquet_schema.clone()
245        } else {
246            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
247            if let Some(schema_root) = &options.schema_root {
248                converter = converter.schema_root(schema_root);
249            }
250
251            converter.convert(&arrow_schema)?
252        };
253
254        if !options.skip_arrow_metadata {
255            // add serialized arrow schema
256            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
257        }
258
259        let max_row_group_row_count = props.max_row_group_row_count();
260        let max_row_group_bytes = props.max_row_group_bytes();
261
262        let props_ptr = Arc::new(props);
263        let file_writer =
264            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
265
266        let row_group_writer_factory =
267            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
268
269        let cdc_chunkers = props_ptr
270            .content_defined_chunking()
271            .map(|opts| {
272                file_writer
273                    .schema_descr()
274                    .columns()
275                    .iter()
276                    .map(|desc| ContentDefinedChunker::new(desc, opts))
277                    .collect::<Result<Vec<_>>>()
278            })
279            .transpose()?;
280
281        Ok(Self {
282            writer: file_writer,
283            in_progress: None,
284            arrow_schema,
285            row_group_writer_factory,
286            max_row_group_row_count,
287            max_row_group_bytes,
288            cdc_chunkers,
289        })
290    }
291
292    /// Returns metadata for any flushed row groups
293    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
294        self.writer.flushed_row_groups()
295    }
296
297    /// Estimated memory usage, in bytes, of this `ArrowWriter`
298    ///
299    /// This estimate is formed bu summing the values of
300    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
301    pub fn memory_size(&self) -> usize {
302        match &self.in_progress {
303            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
304            None => 0,
305        }
306    }
307
308    /// Anticipated encoded size of the in progress row group.
309    ///
310    /// This estimate the row group size after being completely encoded is,
311    /// formed by summing the values of
312    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
313    /// columns.
314    pub fn in_progress_size(&self) -> usize {
315        match &self.in_progress {
316            Some(in_progress) => in_progress
317                .writers
318                .iter()
319                .map(|x| x.get_estimated_total_bytes())
320                .sum(),
321            None => 0,
322        }
323    }
324
325    /// Returns the number of rows buffered in the in progress row group
326    pub fn in_progress_rows(&self) -> usize {
327        self.in_progress
328            .as_ref()
329            .map(|x| x.buffered_rows)
330            .unwrap_or_default()
331    }
332
333    /// Returns the number of bytes written by this instance
334    pub fn bytes_written(&self) -> usize {
335        self.writer.bytes_written()
336    }
337
338    /// Encodes the provided [`RecordBatch`]
339    ///
340    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]
341    /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be
342    /// written to one or more row groups such that limits are respected.
343    ///
344    /// If both limits are `None`, all data is written to a single row group.
345    /// If one limit is set, that limit is respected.
346    /// If both limits are set, the lower bound (whichever triggers first) is respected.
347    ///
348    /// This will fail if the `batch`'s schema does not match the writer's schema.
349    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
350        if batch.num_rows() == 0 {
351            return Ok(());
352        }
353
354        let in_progress = match &mut self.in_progress {
355            Some(in_progress) => in_progress,
356            x => x.insert(
357                self.row_group_writer_factory
358                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
359            ),
360        };
361
362        if let Some(max_rows) = self.max_row_group_row_count {
363            if in_progress.buffered_rows + batch.num_rows() > max_rows {
364                let to_write = max_rows - in_progress.buffered_rows;
365                let a = batch.slice(0, to_write);
366                let b = batch.slice(to_write, batch.num_rows() - to_write);
367                self.write(&a)?;
368                return self.write(&b);
369            }
370        }
371
372        // Check byte limit: if we have buffered data, use measured average row size
373        // to split batch proactively before exceeding byte limit
374        if let Some(max_bytes) = self.max_row_group_bytes {
375            if in_progress.buffered_rows > 0 {
376                let current_bytes = in_progress.get_estimated_total_bytes();
377
378                if current_bytes >= max_bytes {
379                    self.flush()?;
380                    return self.write(batch);
381                }
382
383                let avg_row_bytes = current_bytes / in_progress.buffered_rows;
384                if avg_row_bytes > 0 {
385                    // At this point, `current_bytes < max_bytes` (checked above)
386                    let remaining_bytes = max_bytes - current_bytes;
387                    let rows_that_fit = remaining_bytes / avg_row_bytes;
388
389                    if batch.num_rows() > rows_that_fit {
390                        if rows_that_fit > 0 {
391                            let a = batch.slice(0, rows_that_fit);
392                            let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
393                            self.write(&a)?;
394                            return self.write(&b);
395                        } else {
396                            self.flush()?;
397                            return self.write(batch);
398                        }
399                    }
400                }
401            }
402        }
403
404        match self.cdc_chunkers.as_mut() {
405            Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
406            None => in_progress.write(batch)?,
407        }
408
409        let should_flush = self
410            .max_row_group_row_count
411            .is_some_and(|max| in_progress.buffered_rows >= max)
412            || self
413                .max_row_group_bytes
414                .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
415
416        if should_flush {
417            self.flush()?
418        }
419        Ok(())
420    }
421
422    /// Writes the given buf bytes to the internal buffer.
423    ///
424    /// It's safe to use this method to write data to the underlying writer,
425    /// because it will ensure that the buffering and byte‐counting layers are used.
426    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
427        self.writer.write_all(buf)
428    }
429
430    /// Flushes underlying writer
431    pub fn sync(&mut self) -> std::io::Result<()> {
432        self.writer.flush()
433    }
434
435    /// Flushes all buffered rows into a new row group
436    ///
437    /// Note the underlying writer is not flushed with this call.
438    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
439    pub fn flush(&mut self) -> Result<()> {
440        let in_progress = match self.in_progress.take() {
441            Some(in_progress) => in_progress,
442            None => return Ok(()),
443        };
444
445        let mut row_group_writer = self.writer.next_row_group()?;
446        for chunk in in_progress.close()? {
447            chunk.append_to_row_group(&mut row_group_writer)?;
448        }
449        row_group_writer.close()?;
450        Ok(())
451    }
452
453    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
454    ///
455    /// This method provide a way to append kv_metadata after write RecordBatch
456    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
457        self.writer.append_key_value_metadata(kv_metadata)
458    }
459
460    /// Returns a reference to the underlying writer.
461    pub fn inner(&self) -> &W {
462        self.writer.inner()
463    }
464
465    /// Returns a mutable reference to the underlying writer.
466    ///
467    /// **Warning**: if you write directly to this writer, you will skip
468    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
469    /// the file footer’s recorded offsets and sizes to diverge from reality,
470    /// resulting in an unreadable or corrupted Parquet file.
471    ///
472    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
473    pub fn inner_mut(&mut self) -> &mut W {
474        self.writer.inner_mut()
475    }
476
477    /// Flushes any outstanding data and returns the underlying writer.
478    pub fn into_inner(mut self) -> Result<W> {
479        self.flush()?;
480        self.writer.into_inner()
481    }
482
483    /// Close and finalize the underlying Parquet writer
484    ///
485    /// Unlike [`Self::close`] this does not consume self
486    ///
487    /// Attempting to write after calling finish will result in an error
488    pub fn finish(&mut self) -> Result<ParquetMetaData> {
489        self.flush()?;
490        self.writer.finish()
491    }
492
493    /// Close and finalize the underlying Parquet writer
494    pub fn close(mut self) -> Result<ParquetMetaData> {
495        self.finish()
496    }
497
498    /// Create a new row group writer and return its column writers.
499    #[deprecated(
500        since = "56.2.0",
501        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
502    )]
503    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
504        self.flush()?;
505        let in_progress = self
506            .row_group_writer_factory
507            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
508        Ok(in_progress.writers)
509    }
510
511    /// Append the given column chunks to the file as a new row group.
512    #[deprecated(
513        since = "56.2.0",
514        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
515    )]
516    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
517        let mut row_group_writer = self.writer.next_row_group()?;
518        for chunk in chunks {
519            chunk.append_to_row_group(&mut row_group_writer)?;
520        }
521        row_group_writer.close()?;
522        Ok(())
523    }
524
525    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
526    ///
527    /// Flushes any outstanding data before returning.
528    ///
529    /// This can be useful to provide more control over how files are written, for example
530    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
531    pub fn into_serialized_writer(
532        mut self,
533    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
534        self.flush()?;
535        Ok((self.writer, self.row_group_writer_factory))
536    }
537}
538
539impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
540    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
541        self.write(batch).map_err(|e| e.into())
542    }
543
544    fn close(self) -> std::result::Result<(), ArrowError> {
545        self.close()?;
546        Ok(())
547    }
548}
549
550/// Arrow-specific configuration settings for writing parquet files.
551///
552/// See [`ArrowWriter`] for how to configure the writer.
553#[derive(Debug, Clone, Default)]
554pub struct ArrowWriterOptions {
555    properties: WriterProperties,
556    skip_arrow_metadata: bool,
557    schema_root: Option<String>,
558    schema_descr: Option<SchemaDescriptor>,
559}
560
561impl ArrowWriterOptions {
562    /// Creates a new [`ArrowWriterOptions`] with the default settings.
563    pub fn new() -> Self {
564        Self::default()
565    }
566
567    /// Sets the [`WriterProperties`] for writing parquet files.
568    pub fn with_properties(self, properties: WriterProperties) -> Self {
569        Self { properties, ..self }
570    }
571
572    /// Skip encoding the embedded arrow metadata (defaults to `false`)
573    ///
574    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
575    /// by default.
576    ///
577    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
578    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
579        Self {
580            skip_arrow_metadata,
581            ..self
582        }
583    }
584
585    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
586    pub fn with_schema_root(self, schema_root: String) -> Self {
587        Self {
588            schema_root: Some(schema_root),
589            ..self
590        }
591    }
592
593    /// Explicitly specify the Parquet schema to be used
594    ///
595    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
596    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
597    /// already known or must be calculated using custom logic.
598    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
599        Self {
600            schema_descr: Some(schema_descr),
601            ..self
602        }
603    }
604}
605
606/// A single column chunk produced by [`ArrowColumnWriter`]
607#[derive(Default)]
608struct ArrowColumnChunkData {
609    length: usize,
610    data: Vec<Bytes>,
611}
612
613impl Length for ArrowColumnChunkData {
614    fn len(&self) -> u64 {
615        self.length as _
616    }
617}
618
619impl ChunkReader for ArrowColumnChunkData {
620    type T = ArrowColumnChunkReader;
621
622    fn get_read(&self, start: u64) -> Result<Self::T> {
623        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
624        Ok(ArrowColumnChunkReader(
625            self.data.clone().into_iter().peekable(),
626        ))
627    }
628
629    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
630        unimplemented!()
631    }
632}
633
634/// A [`Read`] for [`ArrowColumnChunkData`]
635struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
636
637impl Read for ArrowColumnChunkReader {
638    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
639        let buffer = loop {
640            match self.0.peek_mut() {
641                Some(b) if b.is_empty() => {
642                    self.0.next();
643                    continue;
644                }
645                Some(b) => break b,
646                None => return Ok(0),
647            }
648        };
649
650        let len = buffer.len().min(out.len());
651        let b = buffer.split_to(len);
652        out[..len].copy_from_slice(&b);
653        Ok(len)
654    }
655}
656
657/// A shared [`ArrowColumnChunkData`]
658///
659/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
660/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
661type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
662
663#[derive(Default)]
664struct ArrowPageWriter {
665    buffer: SharedColumnChunk,
666    #[cfg(feature = "encryption")]
667    page_encryptor: Option<PageEncryptor>,
668}
669
670impl ArrowPageWriter {
671    #[cfg(feature = "encryption")]
672    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
673        self.page_encryptor = page_encryptor;
674        self
675    }
676
677    #[cfg(feature = "encryption")]
678    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
679        self.page_encryptor.as_mut()
680    }
681
682    #[cfg(not(feature = "encryption"))]
683    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
684        None
685    }
686}
687
688impl PageWriter for ArrowPageWriter {
689    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
690        let page = match self.page_encryptor_mut() {
691            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
692            None => page,
693        };
694
695        let page_header = page.to_thrift_header()?;
696        let header = {
697            let mut header = Vec::with_capacity(1024);
698
699            match self.page_encryptor_mut() {
700                Some(page_encryptor) => {
701                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
702                    if page.compressed_page().is_data_page() {
703                        page_encryptor.increment_page();
704                    }
705                }
706                None => {
707                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
708                    page_header.write_thrift(&mut protocol)?;
709                }
710            };
711
712            Bytes::from(header)
713        };
714
715        let mut buf = self.buffer.try_lock().unwrap();
716
717        let data = page.compressed_page().buffer().clone();
718        let compressed_size = data.len() + header.len();
719
720        let mut spec = PageWriteSpec::new();
721        spec.page_type = page.page_type();
722        spec.num_values = page.num_values();
723        spec.uncompressed_size = page.uncompressed_size() + header.len();
724        spec.offset = buf.length as u64;
725        spec.compressed_size = compressed_size;
726        spec.bytes_written = compressed_size as u64;
727
728        buf.length += compressed_size;
729        buf.data.push(header);
730        buf.data.push(data);
731
732        Ok(spec)
733    }
734
735    fn close(&mut self) -> Result<()> {
736        Ok(())
737    }
738}
739
740/// A leaf column that can be encoded by [`ArrowColumnWriter`]
741#[derive(Debug)]
742pub struct ArrowLeafColumn(ArrayLevels);
743
744/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
745///
746/// This function can be used along with [`get_column_writers`] to encode
747/// individual columns in parallel. See example on [`ArrowColumnWriter`]
748pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
749    let levels = calculate_array_levels(array, field)?;
750    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
751}
752
753/// The data for a single column chunk, see [`ArrowColumnWriter`]
754pub struct ArrowColumnChunk {
755    data: ArrowColumnChunkData,
756    close: ColumnCloseResult,
757}
758
759impl std::fmt::Debug for ArrowColumnChunk {
760    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761        f.debug_struct("ArrowColumnChunk")
762            .field("length", &self.data.length)
763            .finish_non_exhaustive()
764    }
765}
766
767impl ArrowColumnChunk {
768    /// Returns the [`ColumnCloseResult`] produced when the chunk was closed.
769    ///
770    /// Exposes encoding information, collected statistics, and the optional
771    /// [`ColumnIndexMetaData`](crate::file::page_index::column_index::ColumnIndexMetaData)
772    /// / [`OffsetIndexMetaData`](crate::file::page_index::offset_index::OffsetIndexMetaData)
773    /// gathered for the column chunk.
774    pub fn close(&self) -> &ColumnCloseResult {
775        &self.close
776    }
777
778    /// Returns a mutable reference to the [`ColumnCloseResult`].
779    ///
780    /// This allows callers to mutate the close result before the chunk is
781    /// appended to a row group — for example, clearing `column_index` or
782    /// `bloom_filter` based on a dynamic rule that inspects the encodings and
783    /// collected page statistics.
784    pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
785        &mut self.close
786    }
787
788    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
789    pub fn append_to_row_group<W: Write + Send>(
790        self,
791        writer: &mut SerializedRowGroupWriter<'_, W>,
792    ) -> Result<()> {
793        writer.append_column(&self.data, self.close)
794    }
795}
796
797/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
798///
799/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
800///
801/// Note: This is a low-level interface for applications that require
802/// fine-grained control of encoding (e.g. encoding using multiple threads),
803/// see [`ArrowWriter`] for a higher-level interface
804///
805/// # Example: Encoding two Arrow Array's in Parallel
806/// ```
807/// // The arrow schema
808/// # use std::sync::Arc;
809/// # use arrow_array::*;
810/// # use arrow_schema::*;
811/// # use parquet::arrow::ArrowSchemaConverter;
812/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
813/// # use parquet::file::properties::WriterProperties;
814/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
815/// #
816/// let schema = Arc::new(Schema::new(vec![
817///     Field::new("i32", DataType::Int32, false),
818///     Field::new("f32", DataType::Float32, false),
819/// ]));
820///
821/// // Compute the parquet schema
822/// let props = Arc::new(WriterProperties::default());
823/// let parquet_schema = ArrowSchemaConverter::new()
824///   .with_coerce_types(props.coerce_types())
825///   .convert(&schema)
826///   .unwrap();
827///
828/// // Create parquet writer
829/// let root_schema = parquet_schema.root_schema_ptr();
830/// // write to memory in the example, but this could be a File
831/// let mut out = Vec::with_capacity(1024);
832/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
833///   .unwrap();
834///
835/// // Create a factory for building Arrow column writers
836/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
837/// // Create column writers for the 0th row group
838/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
839///
840/// // Spawn a worker thread for each column
841/// //
842/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
843/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
844/// let mut workers: Vec<_> = col_writers
845///     .into_iter()
846///     .map(|mut col_writer| {
847///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
848///         let handle = std::thread::spawn(move || {
849///             // receive Arrays to encode via the channel
850///             for col in recv {
851///                 col_writer.write(&col)?;
852///             }
853///             // once the input is complete, close the writer
854///             // to return the newly created ArrowColumnChunk
855///             col_writer.close()
856///         });
857///         (handle, send)
858///     })
859///     .collect();
860///
861/// // Start row group
862/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
863///   .next_row_group()
864///   .unwrap();
865///
866/// // Create some example input columns to encode
867/// let to_write = vec![
868///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
869///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
870/// ];
871///
872/// // Send the input columns to the workers
873/// let mut worker_iter = workers.iter_mut();
874/// for (arr, field) in to_write.iter().zip(&schema.fields) {
875///     for leaves in compute_leaves(field, arr).unwrap() {
876///         worker_iter.next().unwrap().1.send(leaves).unwrap();
877///     }
878/// }
879///
880/// // Wait for the workers to complete encoding, and append
881/// // the resulting column chunks to the row group (and the file)
882/// for (handle, send) in workers {
883///     drop(send); // Drop send side to signal termination
884///     // wait for the worker to send the completed chunk
885///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
886///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
887/// }
888/// // Close the row group which writes to the underlying file
889/// row_group_writer.close().unwrap();
890///
891/// let metadata = writer.close().unwrap();
892/// assert_eq!(metadata.file_metadata().num_rows(), 3);
893/// ```
894pub struct ArrowColumnWriter {
895    writer: ArrowColumnWriterImpl,
896    chunk: SharedColumnChunk,
897}
898
899impl std::fmt::Debug for ArrowColumnWriter {
900    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
901        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
902    }
903}
904
905enum ArrowColumnWriterImpl {
906    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
907    Column(ColumnWriter<'static>),
908}
909
910impl ArrowColumnWriter {
911    /// Write an [`ArrowLeafColumn`]
912    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
913        self.write_internal(&col.0)
914    }
915
916    /// Write with content-defined chunking, inserting page flushes at chunk boundaries.
917    fn write_with_chunker(
918        &mut self,
919        col: &ArrowLeafColumn,
920        chunker: &mut ContentDefinedChunker,
921    ) -> Result<()> {
922        let levels = &col.0;
923        let chunks =
924            chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;
925
926        let num_chunks = chunks.len();
927        for (i, chunk) in chunks.iter().enumerate() {
928            let chunk_levels = levels.slice_for_chunk(chunk);
929            self.write_internal(&chunk_levels)?;
930
931            // Add a page break after each chunk except the last
932            if i + 1 < num_chunks {
933                match &mut self.writer {
934                    ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
935                    ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
936                }
937            }
938        }
939        Ok(())
940    }
941
942    fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
943        match &mut self.writer {
944            ArrowColumnWriterImpl::Column(c) => {
945                let leaf = levels.array();
946                match leaf.as_any_dictionary_opt() {
947                    Some(dictionary) => {
948                        let materialized =
949                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
950                        write_leaf(c, &materialized, levels)?
951                    }
952                    None => write_leaf(c, leaf, levels)?,
953                };
954            }
955            ArrowColumnWriterImpl::ByteArray(c) => {
956                write_primitive(c, levels.array().as_ref(), levels)?;
957            }
958        }
959        Ok(())
960    }
961
962    /// Close this column returning the written [`ArrowColumnChunk`]
963    pub fn close(self) -> Result<ArrowColumnChunk> {
964        let close = match self.writer {
965            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
966            ArrowColumnWriterImpl::Column(c) => c.close()?,
967        };
968        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
969        let data = chunk.into_inner().unwrap();
970        Ok(ArrowColumnChunk { data, close })
971    }
972
973    /// Returns the estimated total memory usage by the writer.
974    ///
975    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
976    /// of the current memory usage and not it's anticipated encoded size.
977    ///
978    /// This includes:
979    /// 1. Data buffered in encoded form
980    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
981    ///
982    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
983    pub fn memory_size(&self) -> usize {
984        match &self.writer {
985            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
986            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
987        }
988    }
989
990    /// Returns the estimated total encoded bytes for this column writer.
991    ///
992    /// This includes:
993    /// 1. Data buffered in encoded form
994    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
995    ///
996    /// This value should be less than or equal to [`Self::memory_size`]
997    pub fn get_estimated_total_bytes(&self) -> usize {
998        match &self.writer {
999            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1000            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1001        }
1002    }
1003}
1004
1005/// Encodes [`RecordBatch`] to a parquet row group
1006///
1007/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
1008/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
1009///
1010/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1011#[derive(Debug)]
1012struct ArrowRowGroupWriter {
1013    writers: Vec<ArrowColumnWriter>,
1014    schema: SchemaRef,
1015    buffered_rows: usize,
1016}
1017
1018impl ArrowRowGroupWriter {
1019    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1020        Self {
1021            writers,
1022            schema: arrow.clone(),
1023            buffered_rows: 0,
1024        }
1025    }
1026
1027    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1028        self.buffered_rows += batch.num_rows();
1029        let mut writers = self.writers.iter_mut();
1030        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1031            for leaf in compute_leaves(field.as_ref(), column)? {
1032                writers.next().unwrap().write(&leaf)?;
1033            }
1034        }
1035        Ok(())
1036    }
1037
1038    fn write_with_chunkers(
1039        &mut self,
1040        batch: &RecordBatch,
1041        chunkers: &mut [ContentDefinedChunker],
1042    ) -> Result<()> {
1043        self.buffered_rows += batch.num_rows();
1044        let mut writers = self.writers.iter_mut();
1045        let mut chunkers = chunkers.iter_mut();
1046        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1047            for leaf in compute_leaves(field.as_ref(), column)? {
1048                writers
1049                    .next()
1050                    .unwrap()
1051                    .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1052            }
1053        }
1054        Ok(())
1055    }
1056
1057    /// Returns the estimated total encoded bytes for this row group
1058    fn get_estimated_total_bytes(&self) -> usize {
1059        self.writers
1060            .iter()
1061            .map(|x| x.get_estimated_total_bytes())
1062            .sum()
1063    }
1064
1065    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1066        self.writers
1067            .into_iter()
1068            .map(|writer| writer.close())
1069            .collect()
1070    }
1071}
1072
1073/// Factory that creates new column writers for each row group in the Parquet file.
1074///
1075/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
1076/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1077#[derive(Debug)]
1078pub struct ArrowRowGroupWriterFactory {
1079    schema: SchemaDescPtr,
1080    arrow_schema: SchemaRef,
1081    props: WriterPropertiesPtr,
1082    #[cfg(feature = "encryption")]
1083    file_encryptor: Option<Arc<FileEncryptor>>,
1084}
1085
1086impl ArrowRowGroupWriterFactory {
1087    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
1088    pub fn new<W: Write + Send>(
1089        file_writer: &SerializedFileWriter<W>,
1090        arrow_schema: SchemaRef,
1091    ) -> Self {
1092        let schema = Arc::clone(file_writer.schema_descr_ptr());
1093        let props = Arc::clone(file_writer.properties());
1094        Self {
1095            schema,
1096            arrow_schema,
1097            props,
1098            #[cfg(feature = "encryption")]
1099            file_encryptor: file_writer.file_encryptor(),
1100        }
1101    }
1102
1103    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1104        let writers = self.create_column_writers(row_group_index)?;
1105        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1106    }
1107
1108    /// Create column writers for a new row group, with the given row group index
1109    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1110        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1111        let mut leaves = self.schema.columns().iter();
1112        let column_factory = self.column_writer_factory(row_group_index);
1113        for field in &self.arrow_schema.fields {
1114            column_factory.get_arrow_column_writer(
1115                field.data_type(),
1116                &self.props,
1117                &mut leaves,
1118                &mut writers,
1119            )?;
1120        }
1121        Ok(writers)
1122    }
1123
1124    #[cfg(feature = "encryption")]
1125    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1126        ArrowColumnWriterFactory::new()
1127            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1128    }
1129
1130    #[cfg(not(feature = "encryption"))]
1131    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1132        ArrowColumnWriterFactory::new()
1133    }
1134}
1135
1136/// Returns [`ArrowColumnWriter`]s for each column in a given schema
1137#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1138pub fn get_column_writers(
1139    parquet: &SchemaDescriptor,
1140    props: &WriterPropertiesPtr,
1141    arrow: &SchemaRef,
1142) -> Result<Vec<ArrowColumnWriter>> {
1143    let mut writers = Vec::with_capacity(arrow.fields.len());
1144    let mut leaves = parquet.columns().iter();
1145    let column_factory = ArrowColumnWriterFactory::new();
1146    for field in &arrow.fields {
1147        column_factory.get_arrow_column_writer(
1148            field.data_type(),
1149            props,
1150            &mut leaves,
1151            &mut writers,
1152        )?;
1153    }
1154    Ok(writers)
1155}
1156
1157/// Creates [`ArrowColumnWriter`] instances
1158struct ArrowColumnWriterFactory {
1159    #[cfg(feature = "encryption")]
1160    row_group_index: usize,
1161    #[cfg(feature = "encryption")]
1162    file_encryptor: Option<Arc<FileEncryptor>>,
1163}
1164
1165impl ArrowColumnWriterFactory {
1166    pub fn new() -> Self {
1167        Self {
1168            #[cfg(feature = "encryption")]
1169            row_group_index: 0,
1170            #[cfg(feature = "encryption")]
1171            file_encryptor: None,
1172        }
1173    }
1174
1175    #[cfg(feature = "encryption")]
1176    pub fn with_file_encryptor(
1177        mut self,
1178        row_group_index: usize,
1179        file_encryptor: Option<Arc<FileEncryptor>>,
1180    ) -> Self {
1181        self.row_group_index = row_group_index;
1182        self.file_encryptor = file_encryptor;
1183        self
1184    }
1185
1186    #[cfg(feature = "encryption")]
1187    fn create_page_writer(
1188        &self,
1189        column_descriptor: &ColumnDescPtr,
1190        column_index: usize,
1191    ) -> Result<Box<ArrowPageWriter>> {
1192        let column_path = column_descriptor.path().string();
1193        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1194            &self.file_encryptor,
1195            self.row_group_index,
1196            column_index,
1197            &column_path,
1198        )?;
1199        Ok(Box::new(
1200            ArrowPageWriter::default().with_encryptor(page_encryptor),
1201        ))
1202    }
1203
1204    #[cfg(not(feature = "encryption"))]
1205    fn create_page_writer(
1206        &self,
1207        _column_descriptor: &ColumnDescPtr,
1208        _column_index: usize,
1209    ) -> Result<Box<ArrowPageWriter>> {
1210        Ok(Box::<ArrowPageWriter>::default())
1211    }
1212
1213    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1214    /// output ColumnDesc to `leaves` and the column writers to `out`
1215    fn get_arrow_column_writer(
1216        &self,
1217        data_type: &ArrowDataType,
1218        props: &WriterPropertiesPtr,
1219        leaves: &mut Iter<'_, ColumnDescPtr>,
1220        out: &mut Vec<ArrowColumnWriter>,
1221    ) -> Result<()> {
1222        // Instantiate writers for normal columns
1223        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1224            let page_writer = self.create_page_writer(desc, out.len())?;
1225            let chunk = page_writer.buffer.clone();
1226            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1227            Ok(ArrowColumnWriter {
1228                chunk,
1229                writer: ArrowColumnWriterImpl::Column(writer),
1230            })
1231        };
1232
1233        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1234        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1235            let page_writer = self.create_page_writer(desc, out.len())?;
1236            let chunk = page_writer.buffer.clone();
1237            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1238            Ok(ArrowColumnWriter {
1239                chunk,
1240                writer: ArrowColumnWriterImpl::ByteArray(writer),
1241            })
1242        };
1243
1244        match data_type {
1245            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1246            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1247                out.push(col(leaves.next().unwrap())?)
1248            }
1249            ArrowDataType::LargeBinary
1250            | ArrowDataType::Binary
1251            | ArrowDataType::Utf8
1252            | ArrowDataType::LargeUtf8
1253            | ArrowDataType::BinaryView
1254            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1255            ArrowDataType::List(f)
1256            | ArrowDataType::LargeList(f)
1257            | ArrowDataType::FixedSizeList(f, _)
1258            | ArrowDataType::ListView(f)
1259            | ArrowDataType::LargeListView(f) => {
1260                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1261            }
1262            ArrowDataType::Struct(fields) => {
1263                for field in fields {
1264                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1265                }
1266            }
1267            ArrowDataType::Map(f, _) => match f.data_type() {
1268                ArrowDataType::Struct(f) => {
1269                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1270                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1271                }
1272                _ => unreachable!("invalid map type"),
1273            },
1274            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1275                ArrowDataType::Utf8
1276                | ArrowDataType::LargeUtf8
1277                | ArrowDataType::Binary
1278                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1279                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1280                    out.push(bytes(leaves.next().unwrap())?)
1281                }
1282                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1283                _ => out.push(col(leaves.next().unwrap())?),
1284            },
1285            _ => {
1286                return Err(ParquetError::NYI(format!(
1287                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1288                )));
1289            }
1290        }
1291        Ok(())
1292    }
1293}
1294
1295fn write_leaf(
1296    writer: &mut ColumnWriter<'_>,
1297    column: &dyn arrow_array::Array,
1298    levels: &ArrayLevels,
1299) -> Result<usize> {
1300    let indices = levels.non_null_indices();
1301
1302    match writer {
1303        // Note: this should match the contents of arrow_to_parquet_type
1304        ColumnWriter::Int32ColumnWriter(typed) => {
1305            match column.data_type() {
1306                ArrowDataType::Null => {
1307                    let array = Int32Array::new_null(column.len());
1308                    write_primitive(typed, array.values(), levels)
1309                }
1310                ArrowDataType::Int8 => {
1311                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1312                    write_primitive(typed, array.values(), levels)
1313                }
1314                ArrowDataType::Int16 => {
1315                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1316                    write_primitive(typed, array.values(), levels)
1317                }
1318                ArrowDataType::Int32 => {
1319                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1320                }
1321                ArrowDataType::UInt8 => {
1322                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1323                    write_primitive(typed, array.values(), levels)
1324                }
1325                ArrowDataType::UInt16 => {
1326                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1327                    write_primitive(typed, array.values(), levels)
1328                }
1329                ArrowDataType::UInt32 => {
1330                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1331                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1332                    let array = column.as_primitive::<UInt32Type>();
1333                    write_primitive(typed, array.values().inner().typed_data(), levels)
1334                }
1335                ArrowDataType::Date32 => {
1336                    let array = column.as_primitive::<Date32Type>();
1337                    write_primitive(typed, array.values(), levels)
1338                }
1339                ArrowDataType::Time32(TimeUnit::Second) => {
1340                    let array = column.as_primitive::<Time32SecondType>();
1341                    write_primitive(typed, array.values(), levels)
1342                }
1343                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1344                    let array = column.as_primitive::<Time32MillisecondType>();
1345                    write_primitive(typed, array.values(), levels)
1346                }
1347                ArrowDataType::Date64 => {
1348                    // If the column is a Date64, we truncate it
1349                    let array: Int32Array = column
1350                        .as_primitive::<Date64Type>()
1351                        .unary(|x| (x / 86_400_000) as _);
1352
1353                    write_primitive(typed, array.values(), levels)
1354                }
1355                ArrowDataType::Decimal32(_, _) => {
1356                    let array = column
1357                        .as_primitive::<Decimal32Type>()
1358                        .unary::<_, Int32Type>(|v| v);
1359                    write_primitive(typed, array.values(), levels)
1360                }
1361                ArrowDataType::Decimal64(_, _) => {
1362                    // use the int32 to represent the decimal with low precision
1363                    let array = column
1364                        .as_primitive::<Decimal64Type>()
1365                        .unary::<_, Int32Type>(|v| v as i32);
1366                    write_primitive(typed, array.values(), levels)
1367                }
1368                ArrowDataType::Decimal128(_, _) => {
1369                    // use the int32 to represent the decimal with low precision
1370                    let array = column
1371                        .as_primitive::<Decimal128Type>()
1372                        .unary::<_, Int32Type>(|v| v as i32);
1373                    write_primitive(typed, array.values(), levels)
1374                }
1375                ArrowDataType::Decimal256(_, _) => {
1376                    // use the int32 to represent the decimal with low precision
1377                    let array = column
1378                        .as_primitive::<Decimal256Type>()
1379                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1380                    write_primitive(typed, array.values(), levels)
1381                }
1382                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1383            }
1384        }
1385        ColumnWriter::BoolColumnWriter(typed) => {
1386            let array = column.as_boolean();
1387            typed.write_batch(
1388                get_bool_array_slice(array, indices).as_slice(),
1389                levels.def_levels(),
1390                levels.rep_levels(),
1391            )
1392        }
1393        ColumnWriter::Int64ColumnWriter(typed) => {
1394            match column.data_type() {
1395                ArrowDataType::Date64 => {
1396                    let array = column
1397                        .as_primitive::<Date64Type>()
1398                        .reinterpret_cast::<Int64Type>();
1399
1400                    write_primitive(typed, array.values(), levels)
1401                }
1402                ArrowDataType::Int64 => {
1403                    let array = column.as_primitive::<Int64Type>();
1404                    write_primitive(typed, array.values(), levels)
1405                }
1406                ArrowDataType::UInt64 => {
1407                    let values = column.as_primitive::<UInt64Type>().values();
1408                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1409                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1410                    let array = values.inner().typed_data::<i64>();
1411                    write_primitive(typed, array, levels)
1412                }
1413                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1414                    let array = column.as_primitive::<Time64MicrosecondType>();
1415                    write_primitive(typed, array.values(), levels)
1416                }
1417                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1418                    let array = column.as_primitive::<Time64NanosecondType>();
1419                    write_primitive(typed, array.values(), levels)
1420                }
1421                ArrowDataType::Timestamp(unit, _) => match unit {
1422                    TimeUnit::Second => {
1423                        let array = column.as_primitive::<TimestampSecondType>();
1424                        write_primitive(typed, array.values(), levels)
1425                    }
1426                    TimeUnit::Millisecond => {
1427                        let array = column.as_primitive::<TimestampMillisecondType>();
1428                        write_primitive(typed, array.values(), levels)
1429                    }
1430                    TimeUnit::Microsecond => {
1431                        let array = column.as_primitive::<TimestampMicrosecondType>();
1432                        write_primitive(typed, array.values(), levels)
1433                    }
1434                    TimeUnit::Nanosecond => {
1435                        let array = column.as_primitive::<TimestampNanosecondType>();
1436                        write_primitive(typed, array.values(), levels)
1437                    }
1438                },
1439                ArrowDataType::Duration(unit) => match unit {
1440                    TimeUnit::Second => {
1441                        let array = column.as_primitive::<DurationSecondType>();
1442                        write_primitive(typed, array.values(), levels)
1443                    }
1444                    TimeUnit::Millisecond => {
1445                        let array = column.as_primitive::<DurationMillisecondType>();
1446                        write_primitive(typed, array.values(), levels)
1447                    }
1448                    TimeUnit::Microsecond => {
1449                        let array = column.as_primitive::<DurationMicrosecondType>();
1450                        write_primitive(typed, array.values(), levels)
1451                    }
1452                    TimeUnit::Nanosecond => {
1453                        let array = column.as_primitive::<DurationNanosecondType>();
1454                        write_primitive(typed, array.values(), levels)
1455                    }
1456                },
1457                ArrowDataType::Decimal64(_, _) => {
1458                    let array = column
1459                        .as_primitive::<Decimal64Type>()
1460                        .reinterpret_cast::<Int64Type>();
1461                    write_primitive(typed, array.values(), levels)
1462                }
1463                ArrowDataType::Decimal128(_, _) => {
1464                    // use the int64 to represent the decimal with low precision
1465                    let array = column
1466                        .as_primitive::<Decimal128Type>()
1467                        .unary::<_, Int64Type>(|v| v as i64);
1468                    write_primitive(typed, array.values(), levels)
1469                }
1470                ArrowDataType::Decimal256(_, _) => {
1471                    // use the int64 to represent the decimal with low precision
1472                    let array = column
1473                        .as_primitive::<Decimal256Type>()
1474                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1475                    write_primitive(typed, array.values(), levels)
1476                }
1477                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1478            }
1479        }
1480        ColumnWriter::Int96ColumnWriter(_typed) => {
1481            unreachable!("Currently unreachable because data type not supported")
1482        }
1483        ColumnWriter::FloatColumnWriter(typed) => {
1484            let array = column.as_primitive::<Float32Type>();
1485            write_primitive(typed, array.values(), levels)
1486        }
1487        ColumnWriter::DoubleColumnWriter(typed) => {
1488            let array = column.as_primitive::<Float64Type>();
1489            write_primitive(typed, array.values(), levels)
1490        }
1491        ColumnWriter::ByteArrayColumnWriter(_) => {
1492            unreachable!("should use ByteArrayWriter")
1493        }
1494        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1495            let bytes = match column.data_type() {
1496                ArrowDataType::Interval(interval_unit) => match interval_unit {
1497                    IntervalUnit::YearMonth => {
1498                        let array = column.as_primitive::<IntervalYearMonthType>();
1499                        get_interval_ym_array_slice(array, indices)
1500                    }
1501                    IntervalUnit::DayTime => {
1502                        let array = column.as_primitive::<IntervalDayTimeType>();
1503                        get_interval_dt_array_slice(array, indices)
1504                    }
1505                    _ => {
1506                        return Err(ParquetError::NYI(format!(
1507                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1508                        )));
1509                    }
1510                },
1511                ArrowDataType::FixedSizeBinary(_) => {
1512                    let array = column.as_fixed_size_binary();
1513                    get_fsb_array_slice(array, indices)
1514                }
1515                ArrowDataType::Decimal32(_, _) => {
1516                    let array = column.as_primitive::<Decimal32Type>();
1517                    get_decimal_32_array_slice(array, indices)
1518                }
1519                ArrowDataType::Decimal64(_, _) => {
1520                    let array = column.as_primitive::<Decimal64Type>();
1521                    get_decimal_64_array_slice(array, indices)
1522                }
1523                ArrowDataType::Decimal128(_, _) => {
1524                    let array = column.as_primitive::<Decimal128Type>();
1525                    get_decimal_128_array_slice(array, indices)
1526                }
1527                ArrowDataType::Decimal256(_, _) => {
1528                    let array = column.as_primitive::<Decimal256Type>();
1529                    get_decimal_256_array_slice(array, indices)
1530                }
1531                ArrowDataType::Float16 => {
1532                    let array = column.as_primitive::<Float16Type>();
1533                    get_float_16_array_slice(array, indices)
1534                }
1535                _ => {
1536                    return Err(ParquetError::NYI(
1537                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1538                    ));
1539                }
1540            };
1541            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1542        }
1543    }
1544}
1545
1546fn write_primitive<E: ColumnValueEncoder>(
1547    writer: &mut GenericColumnWriter<E>,
1548    values: &E::Values,
1549    levels: &ArrayLevels,
1550) -> Result<usize> {
1551    writer.write_batch_internal(
1552        values,
1553        Some(levels.non_null_indices()),
1554        levels.def_levels(),
1555        levels.rep_levels(),
1556        None,
1557        None,
1558        None,
1559    )
1560}
1561
1562fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1563    let mut values = Vec::with_capacity(indices.len());
1564    for i in indices {
1565        values.push(array.value(*i))
1566    }
1567    values
1568}
1569
1570/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1571/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1572fn get_interval_ym_array_slice(
1573    array: &arrow_array::IntervalYearMonthArray,
1574    indices: &[usize],
1575) -> Vec<FixedLenByteArray> {
1576    let mut values = Vec::with_capacity(indices.len());
1577    for i in indices {
1578        let mut value = array.value(*i).to_le_bytes().to_vec();
1579        let mut suffix = vec![0; 8];
1580        value.append(&mut suffix);
1581        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1582    }
1583    values
1584}
1585
1586/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1587/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1588fn get_interval_dt_array_slice(
1589    array: &arrow_array::IntervalDayTimeArray,
1590    indices: &[usize],
1591) -> Vec<FixedLenByteArray> {
1592    let mut values = Vec::with_capacity(indices.len());
1593    for i in indices {
1594        let mut out = [0; 12];
1595        let value = array.value(*i);
1596        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1597        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1598        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1599    }
1600    values
1601}
1602
1603fn get_decimal_32_array_slice(
1604    array: &arrow_array::Decimal32Array,
1605    indices: &[usize],
1606) -> Vec<FixedLenByteArray> {
1607    let mut values = Vec::with_capacity(indices.len());
1608    let size = decimal_length_from_precision(array.precision());
1609    for i in indices {
1610        let as_be_bytes = array.value(*i).to_be_bytes();
1611        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1612        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1613    }
1614    values
1615}
1616
1617fn get_decimal_64_array_slice(
1618    array: &arrow_array::Decimal64Array,
1619    indices: &[usize],
1620) -> Vec<FixedLenByteArray> {
1621    let mut values = Vec::with_capacity(indices.len());
1622    let size = decimal_length_from_precision(array.precision());
1623    for i in indices {
1624        let as_be_bytes = array.value(*i).to_be_bytes();
1625        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1626        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1627    }
1628    values
1629}
1630
1631fn get_decimal_128_array_slice(
1632    array: &arrow_array::Decimal128Array,
1633    indices: &[usize],
1634) -> Vec<FixedLenByteArray> {
1635    let mut values = Vec::with_capacity(indices.len());
1636    let size = decimal_length_from_precision(array.precision());
1637    for i in indices {
1638        let as_be_bytes = array.value(*i).to_be_bytes();
1639        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1640        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1641    }
1642    values
1643}
1644
1645fn get_decimal_256_array_slice(
1646    array: &arrow_array::Decimal256Array,
1647    indices: &[usize],
1648) -> Vec<FixedLenByteArray> {
1649    let mut values = Vec::with_capacity(indices.len());
1650    let size = decimal_length_from_precision(array.precision());
1651    for i in indices {
1652        let as_be_bytes = array.value(*i).to_be_bytes();
1653        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1654        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1655    }
1656    values
1657}
1658
1659fn get_float_16_array_slice(
1660    array: &arrow_array::Float16Array,
1661    indices: &[usize],
1662) -> Vec<FixedLenByteArray> {
1663    let mut values = Vec::with_capacity(indices.len());
1664    for i in indices {
1665        let value = array.value(*i).to_le_bytes().to_vec();
1666        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1667    }
1668    values
1669}
1670
1671fn get_fsb_array_slice(
1672    array: &arrow_array::FixedSizeBinaryArray,
1673    indices: &[usize],
1674) -> Vec<FixedLenByteArray> {
1675    let mut values = Vec::with_capacity(indices.len());
1676    for i in indices {
1677        let value = array.value(*i).to_vec();
1678        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1679    }
1680    values
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685    use super::*;
1686    use std::collections::HashMap;
1687
1688    use std::fs::File;
1689
1690    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1691    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1692    use crate::column::page::{Page, PageReader};
1693    use crate::file::metadata::thrift::PageHeader;
1694    use crate::file::page_index::column_index::ColumnIndexMetaData;
1695    use crate::file::reader::SerializedPageReader;
1696    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1697    use crate::schema::types::ColumnPath;
1698    use arrow::datatypes::ToByteSlice;
1699    use arrow::datatypes::{DataType, Schema};
1700    use arrow::error::Result as ArrowResult;
1701    use arrow::util::data_gen::create_random_array;
1702    use arrow::util::pretty::pretty_format_batches;
1703    use arrow::{array::*, buffer::Buffer};
1704    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1705    use arrow_schema::Fields;
1706    use half::f16;
1707    use num_traits::{FromPrimitive, ToPrimitive};
1708    use tempfile::tempfile;
1709
1710    use crate::basic::Encoding;
1711    use crate::data_type::AsBytes;
1712    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1713    use crate::file::properties::{
1714        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1715    };
1716    use crate::file::serialized_reader::ReadOptionsBuilder;
1717    use crate::file::{
1718        reader::{FileReader, SerializedFileReader},
1719        statistics::Statistics,
1720    };
1721
1722    #[test]
1723    fn arrow_writer() {
1724        // define schema
1725        let schema = Schema::new(vec![
1726            Field::new("a", DataType::Int32, false),
1727            Field::new("b", DataType::Int32, true),
1728        ]);
1729
1730        // create some data
1731        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1732        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1733
1734        // build a record batch
1735        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1736
1737        roundtrip(batch, Some(SMALL_SIZE / 2));
1738    }
1739
1740    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1741        let mut buffer = vec![];
1742
1743        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1744        writer.write(expected_batch).unwrap();
1745        writer.close().unwrap();
1746
1747        buffer
1748    }
1749
1750    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1751        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1752        writer.write(expected_batch).unwrap();
1753        writer.into_inner().unwrap()
1754    }
1755
1756    #[test]
1757    fn roundtrip_bytes() {
1758        // define schema
1759        let schema = Arc::new(Schema::new(vec![
1760            Field::new("a", DataType::Int32, false),
1761            Field::new("b", DataType::Int32, true),
1762        ]));
1763
1764        // create some data
1765        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1766        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1767
1768        // build a record batch
1769        let expected_batch =
1770            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1771
1772        for buffer in [
1773            get_bytes_after_close(schema.clone(), &expected_batch),
1774            get_bytes_by_into_inner(schema, &expected_batch),
1775        ] {
1776            let cursor = Bytes::from(buffer);
1777            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1778
1779            let actual_batch = record_batch_reader
1780                .next()
1781                .expect("No batch found")
1782                .expect("Unable to get batch");
1783
1784            assert_eq!(expected_batch.schema(), actual_batch.schema());
1785            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1786            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1787            for i in 0..expected_batch.num_columns() {
1788                let expected_data = expected_batch.column(i).to_data();
1789                let actual_data = actual_batch.column(i).to_data();
1790
1791                assert_eq!(expected_data, actual_data);
1792            }
1793        }
1794    }
1795
1796    #[test]
1797    fn arrow_writer_non_null() {
1798        // define schema
1799        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1800
1801        // create some data
1802        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1803
1804        // build a record batch
1805        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1806
1807        roundtrip(batch, Some(SMALL_SIZE / 2));
1808    }
1809
1810    #[test]
1811    fn arrow_writer_list() {
1812        // define schema
1813        let schema = Schema::new(vec![Field::new(
1814            "a",
1815            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1816            true,
1817        )]);
1818
1819        // create some data
1820        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1821
1822        // Construct a buffer for value offsets, for the nested array:
1823        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1824        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1825
1826        // Construct a list array from the above two
1827        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1828            DataType::Int32,
1829            false,
1830        ))))
1831        .len(5)
1832        .add_buffer(a_value_offsets)
1833        .add_child_data(a_values.into_data())
1834        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1835        .build()
1836        .unwrap();
1837        let a = ListArray::from(a_list_data);
1838
1839        // build a record batch
1840        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1841
1842        assert_eq!(batch.column(0).null_count(), 1);
1843
1844        // This test fails if the max row group size is less than the batch's length
1845        // see https://github.com/apache/arrow-rs/issues/518
1846        roundtrip(batch, None);
1847    }
1848
1849    #[test]
1850    fn arrow_writer_list_non_null() {
1851        // define schema
1852        let schema = Schema::new(vec![Field::new(
1853            "a",
1854            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1855            false,
1856        )]);
1857
1858        // create some data
1859        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1860
1861        // Construct a buffer for value offsets, for the nested array:
1862        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1863        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1864
1865        // Construct a list array from the above two
1866        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1867            DataType::Int32,
1868            false,
1869        ))))
1870        .len(5)
1871        .add_buffer(a_value_offsets)
1872        .add_child_data(a_values.into_data())
1873        .build()
1874        .unwrap();
1875        let a = ListArray::from(a_list_data);
1876
1877        // build a record batch
1878        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1879
1880        // This test fails if the max row group size is less than the batch's length
1881        // see https://github.com/apache/arrow-rs/issues/518
1882        assert_eq!(batch.column(0).null_count(), 0);
1883
1884        roundtrip(batch, None);
1885    }
1886
1887    #[test]
1888    fn arrow_writer_list_view() {
1889        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1890        let schema = Schema::new(vec![Field::new(
1891            "a",
1892            DataType::ListView(list_field.clone()),
1893            true,
1894        )]);
1895
1896        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1897        let a = ListViewArray::new(
1898            list_field,
1899            vec![0, 1, 0, 3, 6].into(),
1900            vec![1, 2, 0, 3, 4].into(),
1901            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1902            Some(vec![true, true, false, true, true].into()),
1903        );
1904
1905        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1906
1907        assert_eq!(batch.column(0).null_count(), 1);
1908
1909        roundtrip(batch, None);
1910    }
1911
1912    #[test]
1913    fn arrow_writer_list_view_non_null() {
1914        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1915        let schema = Schema::new(vec![Field::new(
1916            "a",
1917            DataType::ListView(list_field.clone()),
1918            false,
1919        )]);
1920
1921        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1922        let a = ListViewArray::new(
1923            list_field,
1924            vec![0, 1, 0, 3, 6].into(),
1925            vec![1, 2, 0, 3, 4].into(),
1926            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1927            None,
1928        );
1929
1930        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1931
1932        assert_eq!(batch.column(0).null_count(), 0);
1933
1934        roundtrip(batch, None);
1935    }
1936
1937    #[test]
1938    fn arrow_writer_list_view_out_of_order() {
1939        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1940        let schema = Schema::new(vec![Field::new(
1941            "a",
1942            DataType::ListView(list_field.clone()),
1943            false,
1944        )]);
1945
1946        // [[1], [2, 3], [], [7, 8, 9, 10], [4, 5, 6]] - out of order offsets
1947        let a = ListViewArray::new(
1948            list_field,
1949            vec![0, 1, 0, 6, 3].into(),
1950            vec![1, 2, 0, 4, 3].into(),
1951            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1952            None,
1953        );
1954
1955        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1956
1957        roundtrip(batch, None);
1958    }
1959
1960    #[test]
1961    fn arrow_writer_large_list_view() {
1962        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1963        let schema = Schema::new(vec![Field::new(
1964            "a",
1965            DataType::LargeListView(list_field.clone()),
1966            true,
1967        )]);
1968
1969        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1970        let a = LargeListViewArray::new(
1971            list_field,
1972            vec![0i64, 1, 0, 3, 6].into(),
1973            vec![1i64, 2, 0, 3, 4].into(),
1974            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1975            Some(vec![true, true, false, true, true].into()),
1976        );
1977
1978        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1979
1980        assert_eq!(batch.column(0).null_count(), 1);
1981
1982        roundtrip(batch, None);
1983    }
1984
1985    #[test]
1986    fn arrow_writer_list_view_with_struct() {
1987        // Test ListView containing Struct: ListView<Struct<Int32, Utf8>>
1988        let struct_fields = Fields::from(vec![
1989            Field::new("id", DataType::Int32, false),
1990            Field::new("name", DataType::Utf8, false),
1991        ]);
1992        let struct_type = DataType::Struct(struct_fields.clone());
1993        let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
1994
1995        let schema = Schema::new(vec![Field::new(
1996            "a",
1997            DataType::ListView(list_field.clone()),
1998            true,
1999        )]);
2000
2001        // Create struct values
2002        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2003        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2004        let struct_array = StructArray::new(
2005            struct_fields,
2006            vec![Arc::new(id_array), Arc::new(name_array)],
2007            None,
2008        );
2009
2010        // Create ListView: [{1, "a"}, {2, "b"}], null, [{3, "c"}, {4, "d"}, {5, "e"}]
2011        let list_view = ListViewArray::new(
2012            list_field,
2013            vec![0, 2, 2].into(), // offsets
2014            vec![2, 0, 3].into(), // sizes
2015            Arc::new(struct_array),
2016            Some(vec![true, false, true].into()),
2017        );
2018
2019        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2020
2021        roundtrip(batch, None);
2022    }
2023
2024    #[test]
2025    fn arrow_writer_binary() {
2026        let string_field = Field::new("a", DataType::Utf8, false);
2027        let binary_field = Field::new("b", DataType::Binary, false);
2028        let schema = Schema::new(vec![string_field, binary_field]);
2029
2030        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2031        let raw_binary_values = [
2032            b"foo".to_vec(),
2033            b"bar".to_vec(),
2034            b"baz".to_vec(),
2035            b"quux".to_vec(),
2036        ];
2037        let raw_binary_value_refs = raw_binary_values
2038            .iter()
2039            .map(|x| x.as_slice())
2040            .collect::<Vec<_>>();
2041
2042        let string_values = StringArray::from(raw_string_values.clone());
2043        let binary_values = BinaryArray::from(raw_binary_value_refs);
2044        let batch = RecordBatch::try_new(
2045            Arc::new(schema),
2046            vec![Arc::new(string_values), Arc::new(binary_values)],
2047        )
2048        .unwrap();
2049
2050        roundtrip(batch, Some(SMALL_SIZE / 2));
2051    }
2052
2053    #[test]
2054    fn arrow_writer_binary_view() {
2055        let string_field = Field::new("a", DataType::Utf8View, false);
2056        let binary_field = Field::new("b", DataType::BinaryView, false);
2057        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2058        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2059
2060        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2061        let raw_binary_values = vec![
2062            b"foo".to_vec(),
2063            b"bar".to_vec(),
2064            b"large payload over 12 bytes".to_vec(),
2065            b"lulu".to_vec(),
2066        ];
2067        let nullable_string_values =
2068            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2069
2070        let string_view_values = StringViewArray::from(raw_string_values);
2071        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2072        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2073        let batch = RecordBatch::try_new(
2074            Arc::new(schema),
2075            vec![
2076                Arc::new(string_view_values),
2077                Arc::new(binary_view_values),
2078                Arc::new(nullable_string_view_values),
2079            ],
2080        )
2081        .unwrap();
2082
2083        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2084        roundtrip(batch, None);
2085    }
2086
2087    #[test]
2088    fn arrow_writer_binary_view_long_value() {
2089        let string_field = Field::new("a", DataType::Utf8View, false);
2090        let binary_field = Field::new("b", DataType::BinaryView, false);
2091        let schema = Schema::new(vec![string_field, binary_field]);
2092
2093        // There is special case validation for long values (greater than 128)
2094        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
2095        // trigger the long-string UTF-8 validation branch in the plain decoder.
2096        let long = "a".repeat(128);
2097        let raw_string_values = vec!["foo", long.as_str(), "bar"];
2098        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2099
2100        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2101        let binary_view_values: ArrayRef =
2102            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2103
2104        one_column_roundtrip(Arc::clone(&string_view_values), false);
2105        one_column_roundtrip(Arc::clone(&binary_view_values), false);
2106
2107        let batch = RecordBatch::try_new(
2108            Arc::new(schema),
2109            vec![string_view_values, binary_view_values],
2110        )
2111        .unwrap();
2112
2113        // Disable dictionary to exercise plain encoding paths in the reader.
2114        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2115            let props = WriterProperties::builder()
2116                .set_writer_version(version)
2117                .set_dictionary_enabled(false)
2118                .build();
2119            roundtrip_opts(&batch, props);
2120        }
2121    }
2122
2123    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2124        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2125        let schema = Schema::new(vec![decimal_field]);
2126
2127        let decimal_values = vec![10_000, 50_000, 0, -100]
2128            .into_iter()
2129            .map(Some)
2130            .collect::<Decimal128Array>()
2131            .with_precision_and_scale(precision, scale)
2132            .unwrap();
2133
2134        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2135    }
2136
2137    #[test]
2138    fn arrow_writer_decimal() {
2139        // int32 to store the decimal value
2140        let batch_int32_decimal = get_decimal_batch(5, 2);
2141        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2142        // int64 to store the decimal value
2143        let batch_int64_decimal = get_decimal_batch(12, 2);
2144        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2145        // fixed_length_byte_array to store the decimal value
2146        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2147        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2148    }
2149
2150    #[test]
2151    fn arrow_writer_complex() {
2152        // define schema
2153        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2154        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2155        let struct_field_g = Arc::new(Field::new_list(
2156            "g",
2157            Field::new_list_field(DataType::Int16, true),
2158            false,
2159        ));
2160        let struct_field_h = Arc::new(Field::new_list(
2161            "h",
2162            Field::new_list_field(DataType::Int16, false),
2163            true,
2164        ));
2165        let struct_field_e = Arc::new(Field::new_struct(
2166            "e",
2167            vec![
2168                struct_field_f.clone(),
2169                struct_field_g.clone(),
2170                struct_field_h.clone(),
2171            ],
2172            false,
2173        ));
2174        let schema = Schema::new(vec![
2175            Field::new("a", DataType::Int32, false),
2176            Field::new("b", DataType::Int32, true),
2177            Field::new_struct(
2178                "c",
2179                vec![struct_field_d.clone(), struct_field_e.clone()],
2180                false,
2181            ),
2182        ]);
2183
2184        // create some data
2185        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2186        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2187        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2188        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2189
2190        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2191
2192        // Construct a buffer for value offsets, for the nested array:
2193        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2194        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2195
2196        // Construct a list array from the above two
2197        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2198            .len(5)
2199            .add_buffer(g_value_offsets.clone())
2200            .add_child_data(g_value.to_data())
2201            .build()
2202            .unwrap();
2203        let g = ListArray::from(g_list_data);
2204        // The difference between g and h is that h has a null bitmap
2205        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2206            .len(5)
2207            .add_buffer(g_value_offsets)
2208            .add_child_data(g_value.to_data())
2209            .null_bit_buffer(Some(Buffer::from([0b00011011])))
2210            .build()
2211            .unwrap();
2212        let h = ListArray::from(h_list_data);
2213
2214        let e = StructArray::from(vec![
2215            (struct_field_f, Arc::new(f) as ArrayRef),
2216            (struct_field_g, Arc::new(g) as ArrayRef),
2217            (struct_field_h, Arc::new(h) as ArrayRef),
2218        ]);
2219
2220        let c = StructArray::from(vec![
2221            (struct_field_d, Arc::new(d) as ArrayRef),
2222            (struct_field_e, Arc::new(e) as ArrayRef),
2223        ]);
2224
2225        // build a record batch
2226        let batch = RecordBatch::try_new(
2227            Arc::new(schema),
2228            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2229        )
2230        .unwrap();
2231
2232        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2233        roundtrip(batch, Some(SMALL_SIZE / 3));
2234    }
2235
2236    #[test]
2237    fn arrow_writer_complex_mixed() {
2238        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
2239        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
2240
2241        // define schema
2242        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2243        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2244        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2245        let schema = Schema::new(vec![Field::new(
2246            "some_nested_object",
2247            DataType::Struct(Fields::from(vec![
2248                offset_field.clone(),
2249                partition_field.clone(),
2250                topic_field.clone(),
2251            ])),
2252            false,
2253        )]);
2254
2255        // create some data
2256        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2257        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2258        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2259
2260        let some_nested_object = StructArray::from(vec![
2261            (offset_field, Arc::new(offset) as ArrayRef),
2262            (partition_field, Arc::new(partition) as ArrayRef),
2263            (topic_field, Arc::new(topic) as ArrayRef),
2264        ]);
2265
2266        // build a record batch
2267        let batch =
2268            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2269
2270        roundtrip(batch, Some(SMALL_SIZE / 2));
2271    }
2272
2273    #[test]
2274    fn arrow_writer_map() {
2275        // Note: we are using the JSON Arrow reader for brevity
2276        let json_content = r#"
2277        {"stocks":{"long": "$AAA", "short": "$BBB"}}
2278        {"stocks":{"long": null, "long": "$CCC", "short": null}}
2279        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2280        "#;
2281        let entries_struct_type = DataType::Struct(Fields::from(vec![
2282            Field::new("key", DataType::Utf8, false),
2283            Field::new("value", DataType::Utf8, true),
2284        ]));
2285        let stocks_field = Field::new(
2286            "stocks",
2287            DataType::Map(
2288                Arc::new(Field::new("entries", entries_struct_type, false)),
2289                false,
2290            ),
2291            true,
2292        );
2293        let schema = Arc::new(Schema::new(vec![stocks_field]));
2294        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2295        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2296
2297        let batch = reader.next().unwrap().unwrap();
2298        roundtrip(batch, None);
2299    }
2300
2301    #[test]
2302    fn arrow_writer_2_level_struct() {
2303        // tests writing <struct<struct<primitive>>
2304        let field_c = Field::new("c", DataType::Int32, true);
2305        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2306        let type_a = DataType::Struct(vec![field_b.clone()].into());
2307        let field_a = Field::new("a", type_a, true);
2308        let schema = Schema::new(vec![field_a.clone()]);
2309
2310        // create data
2311        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2312        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2313            .len(6)
2314            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2315            .add_child_data(c.into_data())
2316            .build()
2317            .unwrap();
2318        let b = StructArray::from(b_data);
2319        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2320            .len(6)
2321            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2322            .add_child_data(b.into_data())
2323            .build()
2324            .unwrap();
2325        let a = StructArray::from(a_data);
2326
2327        assert_eq!(a.null_count(), 1);
2328        assert_eq!(a.column(0).null_count(), 2);
2329
2330        // build a racord batch
2331        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2332
2333        roundtrip(batch, Some(SMALL_SIZE / 2));
2334    }
2335
2336    #[test]
2337    fn arrow_writer_2_level_struct_non_null() {
2338        // tests writing <struct<struct<primitive>>
2339        let field_c = Field::new("c", DataType::Int32, false);
2340        let type_b = DataType::Struct(vec![field_c].into());
2341        let field_b = Field::new("b", type_b.clone(), false);
2342        let type_a = DataType::Struct(vec![field_b].into());
2343        let field_a = Field::new("a", type_a.clone(), false);
2344        let schema = Schema::new(vec![field_a]);
2345
2346        // create data
2347        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2348        let b_data = ArrayDataBuilder::new(type_b)
2349            .len(6)
2350            .add_child_data(c.into_data())
2351            .build()
2352            .unwrap();
2353        let b = StructArray::from(b_data);
2354        let a_data = ArrayDataBuilder::new(type_a)
2355            .len(6)
2356            .add_child_data(b.into_data())
2357            .build()
2358            .unwrap();
2359        let a = StructArray::from(a_data);
2360
2361        assert_eq!(a.null_count(), 0);
2362        assert_eq!(a.column(0).null_count(), 0);
2363
2364        // build a racord batch
2365        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2366
2367        roundtrip(batch, Some(SMALL_SIZE / 2));
2368    }
2369
2370    #[test]
2371    fn arrow_writer_2_level_struct_mixed_null() {
2372        // tests writing <struct<struct<primitive>>
2373        let field_c = Field::new("c", DataType::Int32, false);
2374        let type_b = DataType::Struct(vec![field_c].into());
2375        let field_b = Field::new("b", type_b.clone(), true);
2376        let type_a = DataType::Struct(vec![field_b].into());
2377        let field_a = Field::new("a", type_a.clone(), false);
2378        let schema = Schema::new(vec![field_a]);
2379
2380        // create data
2381        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2382        let b_data = ArrayDataBuilder::new(type_b)
2383            .len(6)
2384            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2385            .add_child_data(c.into_data())
2386            .build()
2387            .unwrap();
2388        let b = StructArray::from(b_data);
2389        // a intentionally has no null buffer, to test that this is handled correctly
2390        let a_data = ArrayDataBuilder::new(type_a)
2391            .len(6)
2392            .add_child_data(b.into_data())
2393            .build()
2394            .unwrap();
2395        let a = StructArray::from(a_data);
2396
2397        assert_eq!(a.null_count(), 0);
2398        assert_eq!(a.column(0).null_count(), 2);
2399
2400        // build a racord batch
2401        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2402
2403        roundtrip(batch, Some(SMALL_SIZE / 2));
2404    }
2405
2406    #[test]
2407    fn arrow_writer_2_level_struct_mixed_null_2() {
2408        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2409        let field_c = Field::new("c", DataType::Int32, false);
2410        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2411        let field_e = Field::new(
2412            "e",
2413            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2414            false,
2415        );
2416
2417        let field_b = Field::new(
2418            "b",
2419            DataType::Struct(vec![field_c, field_d, field_e].into()),
2420            false,
2421        );
2422        let type_a = DataType::Struct(vec![field_b.clone()].into());
2423        let field_a = Field::new("a", type_a, true);
2424        let schema = Schema::new(vec![field_a.clone()]);
2425
2426        // create data
2427        let c = Int32Array::from_iter_values(0..6);
2428        let d = FixedSizeBinaryArray::try_from_iter(
2429            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2430        )
2431        .expect("four byte values");
2432        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2433        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2434            .len(6)
2435            .add_child_data(c.into_data())
2436            .add_child_data(d.into_data())
2437            .add_child_data(e.into_data())
2438            .build()
2439            .unwrap();
2440        let b = StructArray::from(b_data);
2441        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2442            .len(6)
2443            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2444            .add_child_data(b.into_data())
2445            .build()
2446            .unwrap();
2447        let a = StructArray::from(a_data);
2448
2449        assert_eq!(a.null_count(), 3);
2450        assert_eq!(a.column(0).null_count(), 0);
2451
2452        // build a record batch
2453        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2454
2455        roundtrip(batch, Some(SMALL_SIZE / 2));
2456    }
2457
2458    #[test]
2459    fn test_fixed_size_binary_in_dict() {
2460        fn test_fixed_size_binary_in_dict_inner<K>()
2461        where
2462            K: ArrowDictionaryKeyType,
2463            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2464            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2465        {
2466            let field = Field::new(
2467                "a",
2468                DataType::Dictionary(
2469                    Box::new(K::DATA_TYPE),
2470                    Box::new(DataType::FixedSizeBinary(4)),
2471                ),
2472                false,
2473            );
2474            let schema = Schema::new(vec![field]);
2475
2476            let keys: Vec<K::Native> = vec![
2477                K::Native::try_from(0u8).unwrap(),
2478                K::Native::try_from(0u8).unwrap(),
2479                K::Native::try_from(1u8).unwrap(),
2480            ];
2481            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2482            let values = FixedSizeBinaryArray::try_from_iter(
2483                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2484            )
2485            .unwrap();
2486
2487            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2488            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2489            roundtrip(batch, None);
2490        }
2491
2492        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2493        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2494        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2495        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2496        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2497        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2498        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2499        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2500    }
2501
2502    #[test]
2503    fn test_empty_dict() {
2504        let struct_fields = Fields::from(vec![Field::new(
2505            "dict",
2506            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2507            false,
2508        )]);
2509
2510        let schema = Schema::new(vec![Field::new_struct(
2511            "struct",
2512            struct_fields.clone(),
2513            true,
2514        )]);
2515        let dictionary = Arc::new(DictionaryArray::new(
2516            Int32Array::new_null(5),
2517            Arc::new(StringArray::new_null(0)),
2518        ));
2519
2520        let s = StructArray::new(
2521            struct_fields,
2522            vec![dictionary],
2523            Some(NullBuffer::new_null(5)),
2524        );
2525
2526        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2527        roundtrip(batch, None);
2528    }
2529    #[test]
2530    fn arrow_writer_page_size() {
2531        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2532
2533        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2534
2535        // Generate an array of 10 unique 10 character string
2536        for i in 0..10 {
2537            let value = i
2538                .to_string()
2539                .repeat(10)
2540                .chars()
2541                .take(10)
2542                .collect::<String>();
2543
2544            builder.append_value(value);
2545        }
2546
2547        let array = Arc::new(builder.finish());
2548
2549        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2550
2551        let file = tempfile::tempfile().unwrap();
2552
2553        // Set everything very low so we fallback to PLAIN encoding after the first row
2554        let props = WriterProperties::builder()
2555            .set_data_page_size_limit(1)
2556            .set_dictionary_page_size_limit(1)
2557            .set_write_batch_size(1)
2558            .build();
2559
2560        let mut writer =
2561            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2562                .expect("Unable to write file");
2563        writer.write(&batch).unwrap();
2564        writer.close().unwrap();
2565
2566        let options = ReadOptionsBuilder::new().with_page_index().build();
2567        let reader =
2568            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2569
2570        let column = reader.metadata().row_group(0).columns();
2571
2572        assert_eq!(column.len(), 1);
2573
2574        // We should write one row before falling back to PLAIN encoding so there should still be a
2575        // dictionary page.
2576        assert!(
2577            column[0].dictionary_page_offset().is_some(),
2578            "Expected a dictionary page"
2579        );
2580
2581        assert!(reader.metadata().offset_index().is_some());
2582        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2583
2584        let page_locations = offset_indexes[0].page_locations.clone();
2585
2586        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2587        // so we expect one dictionary encoded page and then a page per row thereafter.
2588        assert_eq!(
2589            page_locations.len(),
2590            10,
2591            "Expected 10 pages but got {page_locations:#?}"
2592        );
2593    }
2594
2595    #[test]
2596    fn arrow_writer_float_nans() {
2597        let f16_field = Field::new("a", DataType::Float16, false);
2598        let f32_field = Field::new("b", DataType::Float32, false);
2599        let f64_field = Field::new("c", DataType::Float64, false);
2600        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2601
2602        let f16_values = (0..MEDIUM_SIZE)
2603            .map(|i| {
2604                Some(if i % 2 == 0 {
2605                    f16::NAN
2606                } else {
2607                    f16::from_f32(i as f32)
2608                })
2609            })
2610            .collect::<Float16Array>();
2611
2612        let f32_values = (0..MEDIUM_SIZE)
2613            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2614            .collect::<Float32Array>();
2615
2616        let f64_values = (0..MEDIUM_SIZE)
2617            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2618            .collect::<Float64Array>();
2619
2620        let batch = RecordBatch::try_new(
2621            Arc::new(schema),
2622            vec![
2623                Arc::new(f16_values),
2624                Arc::new(f32_values),
2625                Arc::new(f64_values),
2626            ],
2627        )
2628        .unwrap();
2629
2630        roundtrip(batch, None);
2631    }
2632
2633    const SMALL_SIZE: usize = 7;
2634    const MEDIUM_SIZE: usize = 63;
2635
2636    // Write the batch to parquet and read it back out, ensuring
2637    // that what comes out is the same as what was written in
2638    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2639        let mut files = vec![];
2640        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2641            let mut props = WriterProperties::builder().set_writer_version(version);
2642
2643            if let Some(size) = max_row_group_size {
2644                props = props.set_max_row_group_row_count(Some(size))
2645            }
2646
2647            let props = props.build();
2648            files.push(roundtrip_opts(&expected_batch, props))
2649        }
2650        files
2651    }
2652
2653    // Round trip the specified record batch with the specified writer properties,
2654    // to an in-memory file, and validate the arrays using the specified function.
2655    // Returns the in-memory file.
2656    fn roundtrip_opts_with_array_validation<F>(
2657        expected_batch: &RecordBatch,
2658        props: WriterProperties,
2659        validate: F,
2660    ) -> Bytes
2661    where
2662        F: Fn(&ArrayData, &ArrayData),
2663    {
2664        let mut file = vec![];
2665
2666        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2667            .expect("Unable to write file");
2668        writer.write(expected_batch).unwrap();
2669        writer.close().unwrap();
2670
2671        let file = Bytes::from(file);
2672        let mut record_batch_reader =
2673            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2674
2675        let actual_batch = record_batch_reader
2676            .next()
2677            .expect("No batch found")
2678            .expect("Unable to get batch");
2679
2680        assert_eq!(expected_batch.schema(), actual_batch.schema());
2681        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2682        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2683        for i in 0..expected_batch.num_columns() {
2684            let expected_data = expected_batch.column(i).to_data();
2685            let actual_data = actual_batch.column(i).to_data();
2686            validate(&expected_data, &actual_data);
2687        }
2688
2689        file
2690    }
2691
2692    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2693        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2694            a.validate_full().expect("valid expected data");
2695            b.validate_full().expect("valid actual data");
2696            assert_eq!(a, b)
2697        })
2698    }
2699
2700    struct RoundTripOptions {
2701        values: ArrayRef,
2702        schema: SchemaRef,
2703        bloom_filter: bool,
2704        bloom_filter_ndv: Option<u64>,
2705        bloom_filter_position: BloomFilterPosition,
2706    }
2707
2708    impl RoundTripOptions {
2709        fn new(values: ArrayRef, nullable: bool) -> Self {
2710            let data_type = values.data_type().clone();
2711            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2712            Self {
2713                values,
2714                schema: Arc::new(schema),
2715                bloom_filter: false,
2716                bloom_filter_ndv: None,
2717                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2718            }
2719        }
2720    }
2721
2722    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2723        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2724    }
2725
2726    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2727        let mut options = RoundTripOptions::new(values, false);
2728        options.schema = schema;
2729        one_column_roundtrip_with_options(options)
2730    }
2731
2732    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2733        let RoundTripOptions {
2734            values,
2735            schema,
2736            bloom_filter,
2737            bloom_filter_ndv,
2738            bloom_filter_position,
2739        } = options;
2740
2741        let encodings = match values.data_type() {
2742            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2743                vec![
2744                    Encoding::PLAIN,
2745                    Encoding::DELTA_BYTE_ARRAY,
2746                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2747                ]
2748            }
2749            DataType::Int64
2750            | DataType::Int32
2751            | DataType::Int16
2752            | DataType::Int8
2753            | DataType::UInt64
2754            | DataType::UInt32
2755            | DataType::UInt16
2756            | DataType::UInt8 => vec![
2757                Encoding::PLAIN,
2758                Encoding::DELTA_BINARY_PACKED,
2759                Encoding::BYTE_STREAM_SPLIT,
2760            ],
2761            DataType::Float32 | DataType::Float64 => {
2762                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2763            }
2764            _ => vec![Encoding::PLAIN],
2765        };
2766
2767        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2768
2769        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2770
2771        let mut files = vec![];
2772        for dictionary_size in [0, 1, 1024] {
2773            for encoding in &encodings {
2774                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2775                    for row_group_size in row_group_sizes {
2776                        let mut builder = WriterProperties::builder()
2777                            .set_writer_version(version)
2778                            .set_max_row_group_row_count(Some(row_group_size))
2779                            .set_dictionary_enabled(dictionary_size != 0)
2780                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2781                            .set_encoding(*encoding)
2782                            .set_bloom_filter_enabled(bloom_filter)
2783                            .set_bloom_filter_position(bloom_filter_position);
2784                        if let Some(ndv) = bloom_filter_ndv {
2785                            builder = builder.set_bloom_filter_ndv(ndv);
2786                        }
2787                        let props = builder.build();
2788
2789                        files.push(roundtrip_opts(&expected_batch, props))
2790                    }
2791                }
2792            }
2793        }
2794        files
2795    }
2796
2797    fn values_required<A, I>(iter: I) -> Vec<Bytes>
2798    where
2799        A: From<Vec<I::Item>> + Array + 'static,
2800        I: IntoIterator,
2801    {
2802        let raw_values: Vec<_> = iter.into_iter().collect();
2803        let values = Arc::new(A::from(raw_values));
2804        one_column_roundtrip(values, false)
2805    }
2806
2807    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2808    where
2809        A: From<Vec<Option<I::Item>>> + Array + 'static,
2810        I: IntoIterator,
2811    {
2812        let optional_raw_values: Vec<_> = iter
2813            .into_iter()
2814            .enumerate()
2815            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2816            .collect();
2817        let optional_values = Arc::new(A::from(optional_raw_values));
2818        one_column_roundtrip(optional_values, true)
2819    }
2820
2821    fn required_and_optional<A, I>(iter: I)
2822    where
2823        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2824        I: IntoIterator + Clone,
2825    {
2826        values_required::<A, I>(iter.clone());
2827        values_optional::<A, I>(iter);
2828    }
2829
2830    fn check_bloom_filter<T: AsBytes>(
2831        files: Vec<Bytes>,
2832        file_column: String,
2833        positive_values: Vec<T>,
2834        negative_values: Vec<T>,
2835    ) {
2836        files.into_iter().take(1).for_each(|file| {
2837            let file_reader = SerializedFileReader::new_with_options(
2838                file,
2839                ReadOptionsBuilder::new()
2840                    .with_reader_properties(
2841                        ReaderProperties::builder()
2842                            .set_read_bloom_filter(true)
2843                            .build(),
2844                    )
2845                    .build(),
2846            )
2847            .expect("Unable to open file as Parquet");
2848            let metadata = file_reader.metadata();
2849
2850            // Gets bloom filters from all row groups.
2851            let mut bloom_filters: Vec<_> = vec![];
2852            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2853                if let Some((column_index, _)) = row_group
2854                    .columns()
2855                    .iter()
2856                    .enumerate()
2857                    .find(|(_, column)| column.column_path().string() == file_column)
2858                {
2859                    let row_group_reader = file_reader
2860                        .get_row_group(ri)
2861                        .expect("Unable to read row group");
2862                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2863                        bloom_filters.push(sbbf.clone());
2864                    } else {
2865                        panic!("No bloom filter for column named {file_column} found");
2866                    }
2867                } else {
2868                    panic!("No column named {file_column} found");
2869                }
2870            }
2871
2872            positive_values.iter().for_each(|value| {
2873                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2874                assert!(
2875                    found.is_some(),
2876                    "{}",
2877                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2878                );
2879            });
2880
2881            negative_values.iter().for_each(|value| {
2882                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2883                assert!(
2884                    found.is_none(),
2885                    "{}",
2886                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2887                );
2888            });
2889        });
2890    }
2891
2892    #[test]
2893    fn all_null_primitive_single_column() {
2894        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2895        one_column_roundtrip(values, true);
2896    }
2897    #[test]
2898    fn null_single_column() {
2899        let values = Arc::new(NullArray::new(SMALL_SIZE));
2900        one_column_roundtrip(values, true);
2901        // null arrays are always nullable, a test with non-nullable nulls fails
2902    }
2903
2904    #[test]
2905    fn bool_single_column() {
2906        required_and_optional::<BooleanArray, _>(
2907            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2908        );
2909    }
2910
2911    #[test]
2912    fn bool_large_single_column() {
2913        let values = Arc::new(
2914            [None, Some(true), Some(false)]
2915                .iter()
2916                .cycle()
2917                .copied()
2918                .take(200_000)
2919                .collect::<BooleanArray>(),
2920        );
2921        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2922        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2923        let file = tempfile::tempfile().unwrap();
2924
2925        let mut writer =
2926            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2927                .expect("Unable to write file");
2928        writer.write(&expected_batch).unwrap();
2929        writer.close().unwrap();
2930    }
2931
2932    #[test]
2933    fn check_page_offset_index_with_nan() {
2934        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2935        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2936        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2937
2938        let mut out = Vec::with_capacity(1024);
2939        let mut writer =
2940            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2941        writer.write(&batch).unwrap();
2942        let file_meta_data = writer.close().unwrap();
2943        for row_group in file_meta_data.row_groups() {
2944            for column in row_group.columns() {
2945                assert!(column.offset_index_offset().is_some());
2946                assert!(column.offset_index_length().is_some());
2947                assert!(column.column_index_offset().is_none());
2948                assert!(column.column_index_length().is_none());
2949            }
2950        }
2951    }
2952
2953    #[test]
2954    fn i8_single_column() {
2955        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2956    }
2957
2958    #[test]
2959    fn i16_single_column() {
2960        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2961    }
2962
2963    #[test]
2964    fn i32_single_column() {
2965        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2966    }
2967
2968    #[test]
2969    fn i64_single_column() {
2970        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2971    }
2972
2973    #[test]
2974    fn u8_single_column() {
2975        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2976    }
2977
2978    #[test]
2979    fn u16_single_column() {
2980        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2981    }
2982
2983    #[test]
2984    fn u32_single_column() {
2985        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2986    }
2987
2988    #[test]
2989    fn u64_single_column() {
2990        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2991    }
2992
2993    #[test]
2994    fn f32_single_column() {
2995        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2996    }
2997
2998    #[test]
2999    fn f64_single_column() {
3000        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3001    }
3002
3003    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
3004    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
3005    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
3006
3007    #[test]
3008    fn timestamp_second_single_column() {
3009        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3010        let values = Arc::new(TimestampSecondArray::from(raw_values));
3011
3012        one_column_roundtrip(values, false);
3013    }
3014
3015    #[test]
3016    fn timestamp_millisecond_single_column() {
3017        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3018        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3019
3020        one_column_roundtrip(values, false);
3021    }
3022
3023    #[test]
3024    fn timestamp_microsecond_single_column() {
3025        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3026        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3027
3028        one_column_roundtrip(values, false);
3029    }
3030
3031    #[test]
3032    fn timestamp_nanosecond_single_column() {
3033        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3034        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3035
3036        one_column_roundtrip(values, false);
3037    }
3038
3039    #[test]
3040    fn date32_single_column() {
3041        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3042    }
3043
3044    #[test]
3045    fn date64_single_column() {
3046        // Date64 must be a multiple of 86400000, see ARROW-10925
3047        required_and_optional::<Date64Array, _>(
3048            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3049        );
3050    }
3051
3052    #[test]
3053    fn time32_second_single_column() {
3054        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3055    }
3056
3057    #[test]
3058    fn time32_millisecond_single_column() {
3059        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3060    }
3061
3062    #[test]
3063    fn time64_microsecond_single_column() {
3064        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3065    }
3066
3067    #[test]
3068    fn time64_nanosecond_single_column() {
3069        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3070    }
3071
3072    #[test]
3073    fn duration_second_single_column() {
3074        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3075    }
3076
3077    #[test]
3078    fn duration_millisecond_single_column() {
3079        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3080    }
3081
3082    #[test]
3083    fn duration_microsecond_single_column() {
3084        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3085    }
3086
3087    #[test]
3088    fn duration_nanosecond_single_column() {
3089        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3090    }
3091
3092    #[test]
3093    fn interval_year_month_single_column() {
3094        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3095    }
3096
3097    #[test]
3098    fn interval_day_time_single_column() {
3099        required_and_optional::<IntervalDayTimeArray, _>(vec![
3100            IntervalDayTime::new(0, 1),
3101            IntervalDayTime::new(0, 3),
3102            IntervalDayTime::new(3, -2),
3103            IntervalDayTime::new(-200, 4),
3104        ]);
3105    }
3106
3107    #[test]
3108    #[should_panic(
3109        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3110    )]
3111    fn interval_month_day_nano_single_column() {
3112        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3113            IntervalMonthDayNano::new(0, 1, 5),
3114            IntervalMonthDayNano::new(0, 3, 2),
3115            IntervalMonthDayNano::new(3, -2, -5),
3116            IntervalMonthDayNano::new(-200, 4, -1),
3117        ]);
3118    }
3119
3120    #[test]
3121    fn binary_single_column() {
3122        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3123        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3124        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3125
3126        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3127        values_required::<BinaryArray, _>(many_vecs_iter);
3128    }
3129
3130    #[test]
3131    fn binary_view_single_column() {
3132        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3133        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3134        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3135
3136        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3137        values_required::<BinaryViewArray, _>(many_vecs_iter);
3138    }
3139
3140    #[test]
3141    fn i32_column_bloom_filter_at_end() {
3142        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3143        let mut options = RoundTripOptions::new(array, false);
3144        options.bloom_filter = true;
3145        options.bloom_filter_position = BloomFilterPosition::End;
3146
3147        let files = one_column_roundtrip_with_options(options);
3148        check_bloom_filter(
3149            files,
3150            "col".to_string(),
3151            (0..SMALL_SIZE as i32).collect(),
3152            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3153        );
3154    }
3155
3156    #[test]
3157    fn i32_column_bloom_filter() {
3158        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3159        let mut options = RoundTripOptions::new(array, false);
3160        options.bloom_filter = true;
3161
3162        let files = one_column_roundtrip_with_options(options);
3163        check_bloom_filter(
3164            files,
3165            "col".to_string(),
3166            (0..SMALL_SIZE as i32).collect(),
3167            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3168        );
3169    }
3170
3171    /// Test that bloom filter folding produces correct results even when
3172    /// the configured NDV differs significantly from actual NDV.
3173    /// A large NDV means a larger initial filter that gets folded down;
3174    /// a small NDV means a smaller initial filter.
3175    #[test]
3176    fn i32_column_bloom_filter_fixed_ndv() {
3177        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3178
3179        // NDV much larger than actual distinct values — tests folding a large filter down
3180        let mut options = RoundTripOptions::new(array.clone(), false);
3181        options.bloom_filter = true;
3182        options.bloom_filter_ndv = Some(1_000_000);
3183
3184        let files = one_column_roundtrip_with_options(options);
3185        check_bloom_filter(
3186            files,
3187            "col".to_string(),
3188            (0..SMALL_SIZE as i32).collect(),
3189            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3190        );
3191
3192        // NDV smaller than actual distinct values — tests the underestimate path
3193        let mut options = RoundTripOptions::new(array, false);
3194        options.bloom_filter = true;
3195        options.bloom_filter_ndv = Some(3);
3196
3197        let files = one_column_roundtrip_with_options(options);
3198        check_bloom_filter(
3199            files,
3200            "col".to_string(),
3201            (0..SMALL_SIZE as i32).collect(),
3202            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3203        );
3204    }
3205
3206    #[test]
3207    fn binary_column_bloom_filter() {
3208        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3209        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3210        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3211
3212        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3213        let mut options = RoundTripOptions::new(array, false);
3214        options.bloom_filter = true;
3215
3216        let files = one_column_roundtrip_with_options(options);
3217        check_bloom_filter(
3218            files,
3219            "col".to_string(),
3220            many_vecs,
3221            vec![vec![(SMALL_SIZE + 1) as u8]],
3222        );
3223    }
3224
3225    #[test]
3226    fn empty_string_null_column_bloom_filter() {
3227        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3228        let raw_strs = raw_values.iter().map(|s| s.as_str());
3229
3230        let array = Arc::new(StringArray::from_iter_values(raw_strs));
3231        let mut options = RoundTripOptions::new(array, false);
3232        options.bloom_filter = true;
3233
3234        let files = one_column_roundtrip_with_options(options);
3235
3236        let optional_raw_values: Vec<_> = raw_values
3237            .iter()
3238            .enumerate()
3239            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3240            .collect();
3241        // For null slots, empty string should not be in bloom filter.
3242        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3243    }
3244
3245    #[test]
3246    fn large_binary_single_column() {
3247        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3248        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3249        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3250
3251        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3252        values_required::<LargeBinaryArray, _>(many_vecs_iter);
3253    }
3254
3255    #[test]
3256    fn fixed_size_binary_single_column() {
3257        let mut builder = FixedSizeBinaryBuilder::new(4);
3258        builder.append_value(b"0123").unwrap();
3259        builder.append_null();
3260        builder.append_value(b"8910").unwrap();
3261        builder.append_value(b"1112").unwrap();
3262        let array = Arc::new(builder.finish());
3263
3264        one_column_roundtrip(array, true);
3265    }
3266
3267    #[test]
3268    fn string_single_column() {
3269        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3270        let raw_strs = raw_values.iter().map(|s| s.as_str());
3271
3272        required_and_optional::<StringArray, _>(raw_strs);
3273    }
3274
3275    #[test]
3276    fn large_string_single_column() {
3277        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3278        let raw_strs = raw_values.iter().map(|s| s.as_str());
3279
3280        required_and_optional::<LargeStringArray, _>(raw_strs);
3281    }
3282
3283    #[test]
3284    fn string_view_single_column() {
3285        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3286        let raw_strs = raw_values.iter().map(|s| s.as_str());
3287
3288        required_and_optional::<StringViewArray, _>(raw_strs);
3289    }
3290
3291    #[test]
3292    fn null_list_single_column() {
3293        let null_field = Field::new_list_field(DataType::Null, true);
3294        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3295
3296        let schema = Schema::new(vec![list_field]);
3297
3298        // Build [[], null, [null, null]]
3299        let a_values = NullArray::new(2);
3300        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3301        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3302            DataType::Null,
3303            true,
3304        ))))
3305        .len(3)
3306        .add_buffer(a_value_offsets)
3307        .null_bit_buffer(Some(Buffer::from([0b00000101])))
3308        .add_child_data(a_values.into_data())
3309        .build()
3310        .unwrap();
3311
3312        let a = ListArray::from(a_list_data);
3313
3314        assert!(a.is_valid(0));
3315        assert!(!a.is_valid(1));
3316        assert!(a.is_valid(2));
3317
3318        assert_eq!(a.value(0).len(), 0);
3319        assert_eq!(a.value(2).len(), 2);
3320        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3321
3322        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3323        roundtrip(batch, None);
3324    }
3325
3326    #[test]
3327    fn list_single_column() {
3328        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3329        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3330        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3331            DataType::Int32,
3332            false,
3333        ))))
3334        .len(5)
3335        .add_buffer(a_value_offsets)
3336        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3337        .add_child_data(a_values.into_data())
3338        .build()
3339        .unwrap();
3340
3341        assert_eq!(a_list_data.null_count(), 1);
3342
3343        let a = ListArray::from(a_list_data);
3344        let values = Arc::new(a);
3345
3346        one_column_roundtrip(values, true);
3347    }
3348
3349    #[test]
3350    fn large_list_single_column() {
3351        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3352        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3353        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3354            "large_item",
3355            DataType::Int32,
3356            true,
3357        ))))
3358        .len(5)
3359        .add_buffer(a_value_offsets)
3360        .add_child_data(a_values.into_data())
3361        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3362        .build()
3363        .unwrap();
3364
3365        // I think this setup is incorrect because this should pass
3366        assert_eq!(a_list_data.null_count(), 1);
3367
3368        let a = LargeListArray::from(a_list_data);
3369        let values = Arc::new(a);
3370
3371        one_column_roundtrip(values, true);
3372    }
3373
3374    #[test]
3375    fn list_nested_nulls() {
3376        use arrow::datatypes::Int32Type;
3377        let data = vec![
3378            Some(vec![Some(1)]),
3379            Some(vec![Some(2), Some(3)]),
3380            None,
3381            Some(vec![Some(4), Some(5), None]),
3382            Some(vec![None]),
3383            Some(vec![Some(6), Some(7)]),
3384        ];
3385
3386        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3387        one_column_roundtrip(Arc::new(list), true);
3388
3389        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3390        one_column_roundtrip(Arc::new(list), true);
3391    }
3392
3393    #[test]
3394    fn struct_single_column() {
3395        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3396        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3397        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3398
3399        let values = Arc::new(s);
3400        one_column_roundtrip(values, false);
3401    }
3402
3403    #[test]
3404    fn list_and_map_coerced_names() {
3405        // Create map and list with non-Parquet naming
3406        let list_field =
3407            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3408        let map_field = Field::new_map(
3409            "my_map",
3410            "entries",
3411            Field::new("keys", DataType::Int32, false),
3412            Field::new("values", DataType::Int32, true),
3413            false,
3414            true,
3415        );
3416
3417        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3418        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3419
3420        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3421
3422        // Write data to Parquet but coerce names to match spec
3423        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3424        let file = tempfile::tempfile().unwrap();
3425        let mut writer =
3426            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3427
3428        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3429        writer.write(&batch).unwrap();
3430        let file_metadata = writer.close().unwrap();
3431
3432        let schema = file_metadata.file_metadata().schema();
3433        // Coerced name of "item" should be "element"
3434        let list_field = &schema.get_fields()[0].get_fields()[0];
3435        assert_eq!(list_field.get_fields()[0].name(), "element");
3436
3437        let map_field = &schema.get_fields()[1].get_fields()[0];
3438        // Coerced name of "entries" should be "key_value"
3439        assert_eq!(map_field.name(), "key_value");
3440        // Coerced name of "keys" should be "key"
3441        assert_eq!(map_field.get_fields()[0].name(), "key");
3442        // Coerced name of "values" should be "value"
3443        assert_eq!(map_field.get_fields()[1].name(), "value");
3444
3445        // Double check schema after reading from the file
3446        let reader = SerializedFileReader::new(file).unwrap();
3447        let file_schema = reader.metadata().file_metadata().schema();
3448        let fields = file_schema.get_fields();
3449        let list_field = &fields[0].get_fields()[0];
3450        assert_eq!(list_field.get_fields()[0].name(), "element");
3451        let map_field = &fields[1].get_fields()[0];
3452        assert_eq!(map_field.name(), "key_value");
3453        assert_eq!(map_field.get_fields()[0].name(), "key");
3454        assert_eq!(map_field.get_fields()[1].name(), "value");
3455    }
3456
3457    #[test]
3458    fn fallback_flush_data_page() {
3459        //tests if the Fallback::flush_data_page clears all buffers correctly
3460        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3461        let values = Arc::new(StringArray::from(raw_values));
3462        let encodings = vec![
3463            Encoding::DELTA_BYTE_ARRAY,
3464            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3465        ];
3466        let data_type = values.data_type().clone();
3467        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3468        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3469
3470        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3471        let data_page_size_limit: usize = 32;
3472        let write_batch_size: usize = 16;
3473
3474        for encoding in &encodings {
3475            for row_group_size in row_group_sizes {
3476                let props = WriterProperties::builder()
3477                    .set_writer_version(WriterVersion::PARQUET_2_0)
3478                    .set_max_row_group_row_count(Some(row_group_size))
3479                    .set_dictionary_enabled(false)
3480                    .set_encoding(*encoding)
3481                    .set_data_page_size_limit(data_page_size_limit)
3482                    .set_write_batch_size(write_batch_size)
3483                    .build();
3484
3485                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3486                    let string_array_a = StringArray::from(a.clone());
3487                    let string_array_b = StringArray::from(b.clone());
3488                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3489                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3490                    assert_eq!(
3491                        vec_a, vec_b,
3492                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3493                    );
3494                });
3495            }
3496        }
3497    }
3498
3499    #[test]
3500    fn arrow_writer_string_dictionary() {
3501        // define schema
3502        #[allow(deprecated)]
3503        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3504            "dictionary",
3505            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3506            true,
3507            42,
3508            true,
3509        )]));
3510
3511        // create some data
3512        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3513            .iter()
3514            .copied()
3515            .collect();
3516
3517        // build a record batch
3518        one_column_roundtrip_with_schema(Arc::new(d), schema);
3519    }
3520
3521    #[test]
3522    fn arrow_writer_test_type_compatibility() {
3523        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3524        where
3525            T1: Array + 'static,
3526            T2: Array + 'static,
3527        {
3528            let schema1 = Arc::new(Schema::new(vec![Field::new(
3529                "a",
3530                array1.data_type().clone(),
3531                false,
3532            )]));
3533
3534            let file = tempfile().unwrap();
3535            let mut writer =
3536                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3537
3538            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3539            writer.write(&rb1).unwrap();
3540
3541            let schema2 = Arc::new(Schema::new(vec![Field::new(
3542                "a",
3543                array2.data_type().clone(),
3544                false,
3545            )]));
3546            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3547            writer.write(&rb2).unwrap();
3548
3549            writer.close().unwrap();
3550
3551            let mut record_batch_reader =
3552                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3553            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3554
3555            let expected_batch =
3556                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3557            assert_eq!(actual_batch, expected_batch);
3558        }
3559
3560        // check compatibility between native and dictionaries
3561
3562        ensure_compatible_write(
3563            DictionaryArray::new(
3564                UInt8Array::from_iter_values(vec![0]),
3565                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3566            ),
3567            StringArray::from_iter_values(vec!["barquet"]),
3568            DictionaryArray::new(
3569                UInt8Array::from_iter_values(vec![0, 1]),
3570                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3571            ),
3572        );
3573
3574        ensure_compatible_write(
3575            StringArray::from_iter_values(vec!["parquet"]),
3576            DictionaryArray::new(
3577                UInt8Array::from_iter_values(vec![0]),
3578                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3579            ),
3580            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3581        );
3582
3583        // check compatibility between dictionaries with different key types
3584
3585        ensure_compatible_write(
3586            DictionaryArray::new(
3587                UInt8Array::from_iter_values(vec![0]),
3588                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3589            ),
3590            DictionaryArray::new(
3591                UInt16Array::from_iter_values(vec![0]),
3592                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3593            ),
3594            DictionaryArray::new(
3595                UInt8Array::from_iter_values(vec![0, 1]),
3596                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3597            ),
3598        );
3599
3600        // check compatibility between dictionaries with different value types
3601        ensure_compatible_write(
3602            DictionaryArray::new(
3603                UInt8Array::from_iter_values(vec![0]),
3604                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3605            ),
3606            DictionaryArray::new(
3607                UInt8Array::from_iter_values(vec![0]),
3608                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3609            ),
3610            DictionaryArray::new(
3611                UInt8Array::from_iter_values(vec![0, 1]),
3612                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3613            ),
3614        );
3615
3616        // check compatibility between a dictionary and a native array with a different type
3617        ensure_compatible_write(
3618            DictionaryArray::new(
3619                UInt8Array::from_iter_values(vec![0]),
3620                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3621            ),
3622            LargeStringArray::from_iter_values(vec!["barquet"]),
3623            DictionaryArray::new(
3624                UInt8Array::from_iter_values(vec![0, 1]),
3625                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3626            ),
3627        );
3628
3629        // check compatibility for string types
3630
3631        ensure_compatible_write(
3632            StringArray::from_iter_values(vec!["parquet"]),
3633            LargeStringArray::from_iter_values(vec!["barquet"]),
3634            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3635        );
3636
3637        ensure_compatible_write(
3638            LargeStringArray::from_iter_values(vec!["parquet"]),
3639            StringArray::from_iter_values(vec!["barquet"]),
3640            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3641        );
3642
3643        ensure_compatible_write(
3644            StringArray::from_iter_values(vec!["parquet"]),
3645            StringViewArray::from_iter_values(vec!["barquet"]),
3646            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3647        );
3648
3649        ensure_compatible_write(
3650            StringViewArray::from_iter_values(vec!["parquet"]),
3651            StringArray::from_iter_values(vec!["barquet"]),
3652            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3653        );
3654
3655        ensure_compatible_write(
3656            LargeStringArray::from_iter_values(vec!["parquet"]),
3657            StringViewArray::from_iter_values(vec!["barquet"]),
3658            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3659        );
3660
3661        ensure_compatible_write(
3662            StringViewArray::from_iter_values(vec!["parquet"]),
3663            LargeStringArray::from_iter_values(vec!["barquet"]),
3664            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3665        );
3666
3667        // check compatibility for binary types
3668
3669        ensure_compatible_write(
3670            BinaryArray::from_iter_values(vec![b"parquet"]),
3671            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3672            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3673        );
3674
3675        ensure_compatible_write(
3676            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3677            BinaryArray::from_iter_values(vec![b"barquet"]),
3678            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3679        );
3680
3681        ensure_compatible_write(
3682            BinaryArray::from_iter_values(vec![b"parquet"]),
3683            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3684            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3685        );
3686
3687        ensure_compatible_write(
3688            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3689            BinaryArray::from_iter_values(vec![b"barquet"]),
3690            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3691        );
3692
3693        ensure_compatible_write(
3694            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3695            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3696            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3697        );
3698
3699        ensure_compatible_write(
3700            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3701            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3702            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3703        );
3704
3705        // check compatibility for list types
3706
3707        let list_field_metadata = HashMap::from_iter(vec![(
3708            PARQUET_FIELD_ID_META_KEY.to_string(),
3709            "1".to_string(),
3710        )]);
3711        let list_field = Field::new_list_field(DataType::Int32, false);
3712
3713        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3714        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3715
3716        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3717        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3718
3719        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3720        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3721
3722        ensure_compatible_write(
3723            // when the initial schema has the metadata ...
3724            ListArray::try_new(
3725                Arc::new(
3726                    list_field
3727                        .clone()
3728                        .with_metadata(list_field_metadata.clone()),
3729                ),
3730                offsets1,
3731                values1,
3732                None,
3733            )
3734            .unwrap(),
3735            // ... and some intermediate schema doesn't have the metadata
3736            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3737            // ... the write will still go through, and the resulting schema will inherit the initial metadata
3738            ListArray::try_new(
3739                Arc::new(
3740                    list_field
3741                        .clone()
3742                        .with_metadata(list_field_metadata.clone()),
3743                ),
3744                offsets_expected,
3745                values_expected,
3746                None,
3747            )
3748            .unwrap(),
3749        );
3750    }
3751
3752    #[test]
3753    fn arrow_writer_primitive_dictionary() {
3754        // define schema
3755        #[allow(deprecated)]
3756        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3757            "dictionary",
3758            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3759            true,
3760            42,
3761            true,
3762        )]));
3763
3764        // create some data
3765        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3766        builder.append(12345678).unwrap();
3767        builder.append_null();
3768        builder.append(22345678).unwrap();
3769        builder.append(12345678).unwrap();
3770        let d = builder.finish();
3771
3772        one_column_roundtrip_with_schema(Arc::new(d), schema);
3773    }
3774
3775    #[test]
3776    fn arrow_writer_decimal32_dictionary() {
3777        let integers = vec![12345, 56789, 34567];
3778
3779        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3780
3781        let values = Decimal32Array::from(integers.clone())
3782            .with_precision_and_scale(5, 2)
3783            .unwrap();
3784
3785        let array = DictionaryArray::new(keys, Arc::new(values));
3786        one_column_roundtrip(Arc::new(array.clone()), true);
3787
3788        let values = Decimal32Array::from(integers)
3789            .with_precision_and_scale(9, 2)
3790            .unwrap();
3791
3792        let array = array.with_values(Arc::new(values));
3793        one_column_roundtrip(Arc::new(array), true);
3794    }
3795
3796    #[test]
3797    fn arrow_writer_decimal64_dictionary() {
3798        let integers = vec![12345, 56789, 34567];
3799
3800        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3801
3802        let values = Decimal64Array::from(integers.clone())
3803            .with_precision_and_scale(5, 2)
3804            .unwrap();
3805
3806        let array = DictionaryArray::new(keys, Arc::new(values));
3807        one_column_roundtrip(Arc::new(array.clone()), true);
3808
3809        let values = Decimal64Array::from(integers)
3810            .with_precision_and_scale(12, 2)
3811            .unwrap();
3812
3813        let array = array.with_values(Arc::new(values));
3814        one_column_roundtrip(Arc::new(array), true);
3815    }
3816
3817    #[test]
3818    fn arrow_writer_decimal128_dictionary() {
3819        let integers = vec![12345, 56789, 34567];
3820
3821        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3822
3823        let values = Decimal128Array::from(integers.clone())
3824            .with_precision_and_scale(5, 2)
3825            .unwrap();
3826
3827        let array = DictionaryArray::new(keys, Arc::new(values));
3828        one_column_roundtrip(Arc::new(array.clone()), true);
3829
3830        let values = Decimal128Array::from(integers)
3831            .with_precision_and_scale(12, 2)
3832            .unwrap();
3833
3834        let array = array.with_values(Arc::new(values));
3835        one_column_roundtrip(Arc::new(array), true);
3836    }
3837
3838    #[test]
3839    fn arrow_writer_decimal256_dictionary() {
3840        let integers = vec![
3841            i256::from_i128(12345),
3842            i256::from_i128(56789),
3843            i256::from_i128(34567),
3844        ];
3845
3846        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3847
3848        let values = Decimal256Array::from(integers.clone())
3849            .with_precision_and_scale(5, 2)
3850            .unwrap();
3851
3852        let array = DictionaryArray::new(keys, Arc::new(values));
3853        one_column_roundtrip(Arc::new(array.clone()), true);
3854
3855        let values = Decimal256Array::from(integers)
3856            .with_precision_and_scale(12, 2)
3857            .unwrap();
3858
3859        let array = array.with_values(Arc::new(values));
3860        one_column_roundtrip(Arc::new(array), true);
3861    }
3862
3863    #[test]
3864    fn arrow_writer_string_dictionary_unsigned_index() {
3865        // define schema
3866        #[allow(deprecated)]
3867        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3868            "dictionary",
3869            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3870            true,
3871            42,
3872            true,
3873        )]));
3874
3875        // create some data
3876        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3877            .iter()
3878            .copied()
3879            .collect();
3880
3881        one_column_roundtrip_with_schema(Arc::new(d), schema);
3882    }
3883
3884    #[test]
3885    fn u32_min_max() {
3886        // check values roundtrip through parquet
3887        let src = [
3888            u32::MIN,
3889            u32::MIN + 1,
3890            (i32::MAX as u32) - 1,
3891            i32::MAX as u32,
3892            (i32::MAX as u32) + 1,
3893            u32::MAX - 1,
3894            u32::MAX,
3895        ];
3896        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3897        let files = one_column_roundtrip(values, false);
3898
3899        for file in files {
3900            // check statistics are valid
3901            let reader = SerializedFileReader::new(file).unwrap();
3902            let metadata = reader.metadata();
3903
3904            let mut row_offset = 0;
3905            for row_group in metadata.row_groups() {
3906                assert_eq!(row_group.num_columns(), 1);
3907                let column = row_group.column(0);
3908
3909                let num_values = column.num_values() as usize;
3910                let src_slice = &src[row_offset..row_offset + num_values];
3911                row_offset += column.num_values() as usize;
3912
3913                let stats = column.statistics().unwrap();
3914                if let Statistics::Int32(stats) = stats {
3915                    assert_eq!(
3916                        *stats.min_opt().unwrap() as u32,
3917                        *src_slice.iter().min().unwrap()
3918                    );
3919                    assert_eq!(
3920                        *stats.max_opt().unwrap() as u32,
3921                        *src_slice.iter().max().unwrap()
3922                    );
3923                } else {
3924                    panic!("Statistics::Int32 missing")
3925                }
3926            }
3927        }
3928    }
3929
3930    #[test]
3931    fn u64_min_max() {
3932        // check values roundtrip through parquet
3933        let src = [
3934            u64::MIN,
3935            u64::MIN + 1,
3936            (i64::MAX as u64) - 1,
3937            i64::MAX as u64,
3938            (i64::MAX as u64) + 1,
3939            u64::MAX - 1,
3940            u64::MAX,
3941        ];
3942        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3943        let files = one_column_roundtrip(values, false);
3944
3945        for file in files {
3946            // check statistics are valid
3947            let reader = SerializedFileReader::new(file).unwrap();
3948            let metadata = reader.metadata();
3949
3950            let mut row_offset = 0;
3951            for row_group in metadata.row_groups() {
3952                assert_eq!(row_group.num_columns(), 1);
3953                let column = row_group.column(0);
3954
3955                let num_values = column.num_values() as usize;
3956                let src_slice = &src[row_offset..row_offset + num_values];
3957                row_offset += column.num_values() as usize;
3958
3959                let stats = column.statistics().unwrap();
3960                if let Statistics::Int64(stats) = stats {
3961                    assert_eq!(
3962                        *stats.min_opt().unwrap() as u64,
3963                        *src_slice.iter().min().unwrap()
3964                    );
3965                    assert_eq!(
3966                        *stats.max_opt().unwrap() as u64,
3967                        *src_slice.iter().max().unwrap()
3968                    );
3969                } else {
3970                    panic!("Statistics::Int64 missing")
3971                }
3972            }
3973        }
3974    }
3975
3976    #[test]
3977    fn statistics_null_counts_only_nulls() {
3978        // check that null-count statistics for "only NULL"-columns are correct
3979        let values = Arc::new(UInt64Array::from(vec![None, None]));
3980        let files = one_column_roundtrip(values, true);
3981
3982        for file in files {
3983            // check statistics are valid
3984            let reader = SerializedFileReader::new(file).unwrap();
3985            let metadata = reader.metadata();
3986            assert_eq!(metadata.num_row_groups(), 1);
3987            let row_group = metadata.row_group(0);
3988            assert_eq!(row_group.num_columns(), 1);
3989            let column = row_group.column(0);
3990            let stats = column.statistics().unwrap();
3991            assert_eq!(stats.null_count_opt(), Some(2));
3992        }
3993    }
3994
3995    #[test]
3996    fn test_list_of_struct_roundtrip() {
3997        // define schema
3998        let int_field = Field::new("a", DataType::Int32, true);
3999        let int_field2 = Field::new("b", DataType::Int32, true);
4000
4001        let int_builder = Int32Builder::with_capacity(10);
4002        let int_builder2 = Int32Builder::with_capacity(10);
4003
4004        let struct_builder = StructBuilder::new(
4005            vec![int_field, int_field2],
4006            vec![Box::new(int_builder), Box::new(int_builder2)],
4007        );
4008        let mut list_builder = ListBuilder::new(struct_builder);
4009
4010        // Construct the following array
4011        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
4012
4013        // [{a: 1, b: 2}]
4014        let values = list_builder.values();
4015        values
4016            .field_builder::<Int32Builder>(0)
4017            .unwrap()
4018            .append_value(1);
4019        values
4020            .field_builder::<Int32Builder>(1)
4021            .unwrap()
4022            .append_value(2);
4023        values.append(true);
4024        list_builder.append(true);
4025
4026        // []
4027        list_builder.append(true);
4028
4029        // null
4030        list_builder.append(false);
4031
4032        // [null, null]
4033        let values = list_builder.values();
4034        values
4035            .field_builder::<Int32Builder>(0)
4036            .unwrap()
4037            .append_null();
4038        values
4039            .field_builder::<Int32Builder>(1)
4040            .unwrap()
4041            .append_null();
4042        values.append(false);
4043        values
4044            .field_builder::<Int32Builder>(0)
4045            .unwrap()
4046            .append_null();
4047        values
4048            .field_builder::<Int32Builder>(1)
4049            .unwrap()
4050            .append_null();
4051        values.append(false);
4052        list_builder.append(true);
4053
4054        // [{a: null, b: 3}]
4055        let values = list_builder.values();
4056        values
4057            .field_builder::<Int32Builder>(0)
4058            .unwrap()
4059            .append_null();
4060        values
4061            .field_builder::<Int32Builder>(1)
4062            .unwrap()
4063            .append_value(3);
4064        values.append(true);
4065        list_builder.append(true);
4066
4067        // [{a: 2, b: null}]
4068        let values = list_builder.values();
4069        values
4070            .field_builder::<Int32Builder>(0)
4071            .unwrap()
4072            .append_value(2);
4073        values
4074            .field_builder::<Int32Builder>(1)
4075            .unwrap()
4076            .append_null();
4077        values.append(true);
4078        list_builder.append(true);
4079
4080        let array = Arc::new(list_builder.finish());
4081
4082        one_column_roundtrip(array, true);
4083    }
4084
4085    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4086        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4087    }
4088
4089    #[test]
4090    fn test_aggregates_records() {
4091        let arrays = [
4092            Int32Array::from((0..100).collect::<Vec<_>>()),
4093            Int32Array::from((0..50).collect::<Vec<_>>()),
4094            Int32Array::from((200..500).collect::<Vec<_>>()),
4095        ];
4096
4097        let schema = Arc::new(Schema::new(vec![Field::new(
4098            "int",
4099            ArrowDataType::Int32,
4100            false,
4101        )]));
4102
4103        let file = tempfile::tempfile().unwrap();
4104
4105        let props = WriterProperties::builder()
4106            .set_max_row_group_row_count(Some(200))
4107            .build();
4108
4109        let mut writer =
4110            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4111
4112        for array in arrays {
4113            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4114            writer.write(&batch).unwrap();
4115        }
4116
4117        writer.close().unwrap();
4118
4119        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4120        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4121
4122        let batches = builder
4123            .with_batch_size(100)
4124            .build()
4125            .unwrap()
4126            .collect::<ArrowResult<Vec<_>>>()
4127            .unwrap();
4128
4129        assert_eq!(batches.len(), 5);
4130        assert!(batches.iter().all(|x| x.num_columns() == 1));
4131
4132        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4133
4134        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4135
4136        let values: Vec<_> = batches
4137            .iter()
4138            .flat_map(|x| {
4139                x.column(0)
4140                    .as_any()
4141                    .downcast_ref::<Int32Array>()
4142                    .unwrap()
4143                    .values()
4144                    .iter()
4145                    .cloned()
4146            })
4147            .collect();
4148
4149        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4150        assert_eq!(&values, &expected_values)
4151    }
4152
4153    #[test]
4154    fn complex_aggregate() {
4155        // Tests aggregating nested data
4156        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4157        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4158        let struct_a = Arc::new(Field::new(
4159            "struct_a",
4160            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4161            true,
4162        ));
4163
4164        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4165        let struct_b = Arc::new(Field::new(
4166            "struct_b",
4167            DataType::Struct(vec![list_a.clone()].into()),
4168            false,
4169        ));
4170
4171        let schema = Arc::new(Schema::new(vec![struct_b]));
4172
4173        // create nested data
4174        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4175        let field_b_array =
4176            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4177
4178        let struct_a_array = StructArray::from(vec![
4179            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4180            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4181        ]);
4182
4183        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4184            .len(5)
4185            .add_buffer(Buffer::from_iter(vec![
4186                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4187            ]))
4188            .null_bit_buffer(Some(Buffer::from_iter(vec![
4189                true, false, true, false, true,
4190            ])))
4191            .child_data(vec![struct_a_array.into_data()])
4192            .build()
4193            .unwrap();
4194
4195        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4196        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4197
4198        let batch1 =
4199            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4200                .unwrap();
4201
4202        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4203        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4204
4205        let struct_a_array = StructArray::from(vec![
4206            (field_a, Arc::new(field_a_array) as ArrayRef),
4207            (field_b, Arc::new(field_b_array) as ArrayRef),
4208        ]);
4209
4210        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4211            .len(2)
4212            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4213            .child_data(vec![struct_a_array.into_data()])
4214            .build()
4215            .unwrap();
4216
4217        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4218        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4219
4220        let batch2 =
4221            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4222                .unwrap();
4223
4224        let batches = &[batch1, batch2];
4225
4226        // Verify data is as expected
4227
4228        let expected = r#"
4229            +-------------------------------------------------------------------------------------------------------+
4230            | struct_b                                                                                              |
4231            +-------------------------------------------------------------------------------------------------------+
4232            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
4233            | {list: }                                                                                              |
4234            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
4235            | {list: }                                                                                              |
4236            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
4237            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4238            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
4239            +-------------------------------------------------------------------------------------------------------+
4240        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4241
4242        let actual = pretty_format_batches(batches).unwrap().to_string();
4243        assert_eq!(actual, expected);
4244
4245        // Write data
4246        let file = tempfile::tempfile().unwrap();
4247        let props = WriterProperties::builder()
4248            .set_max_row_group_row_count(Some(6))
4249            .build();
4250
4251        let mut writer =
4252            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4253
4254        for batch in batches {
4255            writer.write(batch).unwrap();
4256        }
4257        writer.close().unwrap();
4258
4259        // Read Data
4260        // Should have written entire first batch and first row of second to the first row group
4261        // leaving a single row in the second row group
4262
4263        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4264        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4265
4266        let batches = builder
4267            .with_batch_size(2)
4268            .build()
4269            .unwrap()
4270            .collect::<ArrowResult<Vec<_>>>()
4271            .unwrap();
4272
4273        assert_eq!(batches.len(), 4);
4274        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4275        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4276
4277        let actual = pretty_format_batches(&batches).unwrap().to_string();
4278        assert_eq!(actual, expected);
4279    }
4280
4281    #[test]
4282    fn test_arrow_writer_metadata() {
4283        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4284        let file_schema = batch_schema.clone().with_metadata(
4285            vec![("foo".to_string(), "bar".to_string())]
4286                .into_iter()
4287                .collect(),
4288        );
4289
4290        let batch = RecordBatch::try_new(
4291            Arc::new(batch_schema),
4292            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4293        )
4294        .unwrap();
4295
4296        let mut buf = Vec::with_capacity(1024);
4297        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4298        writer.write(&batch).unwrap();
4299        writer.close().unwrap();
4300    }
4301
4302    #[test]
4303    fn test_arrow_writer_nullable() {
4304        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4305        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4306        let file_schema = Arc::new(file_schema);
4307
4308        let batch = RecordBatch::try_new(
4309            Arc::new(batch_schema),
4310            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4311        )
4312        .unwrap();
4313
4314        let mut buf = Vec::with_capacity(1024);
4315        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4316        writer.write(&batch).unwrap();
4317        writer.close().unwrap();
4318
4319        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4320        let back = read.next().unwrap().unwrap();
4321        assert_eq!(back.schema(), file_schema);
4322        assert_ne!(back.schema(), batch.schema());
4323        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4324    }
4325
4326    #[test]
4327    fn in_progress_accounting() {
4328        // define schema
4329        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4330
4331        // create some data
4332        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4333
4334        // build a record batch
4335        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4336
4337        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4338
4339        // starts empty
4340        assert_eq!(writer.in_progress_size(), 0);
4341        assert_eq!(writer.in_progress_rows(), 0);
4342        assert_eq!(writer.memory_size(), 0);
4343        assert_eq!(writer.bytes_written(), 4); // Initial header
4344        writer.write(&batch).unwrap();
4345
4346        // updated on write
4347        let initial_size = writer.in_progress_size();
4348        assert!(initial_size > 0);
4349        assert_eq!(writer.in_progress_rows(), 5);
4350        let initial_memory = writer.memory_size();
4351        assert!(initial_memory > 0);
4352        // memory estimate is larger than estimated encoded size
4353        assert!(
4354            initial_size <= initial_memory,
4355            "{initial_size} <= {initial_memory}"
4356        );
4357
4358        // updated on second write
4359        writer.write(&batch).unwrap();
4360        assert!(writer.in_progress_size() > initial_size);
4361        assert_eq!(writer.in_progress_rows(), 10);
4362        assert!(writer.memory_size() > initial_memory);
4363        assert!(
4364            writer.in_progress_size() <= writer.memory_size(),
4365            "in_progress_size {} <= memory_size {}",
4366            writer.in_progress_size(),
4367            writer.memory_size()
4368        );
4369
4370        // in progress tracking is cleared, but the overall data written is updated
4371        let pre_flush_bytes_written = writer.bytes_written();
4372        writer.flush().unwrap();
4373        assert_eq!(writer.in_progress_size(), 0);
4374        assert_eq!(writer.memory_size(), 0);
4375        assert!(writer.bytes_written() > pre_flush_bytes_written);
4376
4377        writer.close().unwrap();
4378    }
4379
4380    #[test]
4381    fn test_writer_all_null() {
4382        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4383        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4384        let batch = RecordBatch::try_from_iter(vec![
4385            ("a", Arc::new(a) as ArrayRef),
4386            ("b", Arc::new(b) as ArrayRef),
4387        ])
4388        .unwrap();
4389
4390        let mut buf = Vec::with_capacity(1024);
4391        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4392        writer.write(&batch).unwrap();
4393        writer.close().unwrap();
4394
4395        let bytes = Bytes::from(buf);
4396        let options = ReadOptionsBuilder::new().with_page_index().build();
4397        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4398        let index = reader.metadata().offset_index().unwrap();
4399
4400        assert_eq!(index.len(), 1);
4401        assert_eq!(index[0].len(), 2); // 2 columns
4402        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4403        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4404    }
4405
4406    #[test]
4407    fn test_disabled_statistics_with_page() {
4408        let file_schema = Schema::new(vec![
4409            Field::new("a", DataType::Utf8, true),
4410            Field::new("b", DataType::Utf8, true),
4411        ]);
4412        let file_schema = Arc::new(file_schema);
4413
4414        let batch = RecordBatch::try_new(
4415            file_schema.clone(),
4416            vec![
4417                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4418                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4419            ],
4420        )
4421        .unwrap();
4422
4423        let props = WriterProperties::builder()
4424            .set_statistics_enabled(EnabledStatistics::None)
4425            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4426            .build();
4427
4428        let mut buf = Vec::with_capacity(1024);
4429        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4430        writer.write(&batch).unwrap();
4431
4432        let metadata = writer.close().unwrap();
4433        assert_eq!(metadata.num_row_groups(), 1);
4434        let row_group = metadata.row_group(0);
4435        assert_eq!(row_group.num_columns(), 2);
4436        // Column "a" has both offset and column index, as requested
4437        assert!(row_group.column(0).offset_index_offset().is_some());
4438        assert!(row_group.column(0).column_index_offset().is_some());
4439        // Column "b" should only have offset index
4440        assert!(row_group.column(1).offset_index_offset().is_some());
4441        assert!(row_group.column(1).column_index_offset().is_none());
4442
4443        let options = ReadOptionsBuilder::new().with_page_index().build();
4444        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4445
4446        let row_group = reader.get_row_group(0).unwrap();
4447        let a_col = row_group.metadata().column(0);
4448        let b_col = row_group.metadata().column(1);
4449
4450        // Column chunk of column "a" should have chunk level statistics
4451        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4452            let min = byte_array_stats.min_opt().unwrap();
4453            let max = byte_array_stats.max_opt().unwrap();
4454
4455            assert_eq!(min.as_bytes(), b"a");
4456            assert_eq!(max.as_bytes(), b"d");
4457        } else {
4458            panic!("expecting Statistics::ByteArray");
4459        }
4460
4461        // The column chunk for column "b" shouldn't have statistics
4462        assert!(b_col.statistics().is_none());
4463
4464        let offset_index = reader.metadata().offset_index().unwrap();
4465        assert_eq!(offset_index.len(), 1); // 1 row group
4466        assert_eq!(offset_index[0].len(), 2); // 2 columns
4467
4468        let column_index = reader.metadata().column_index().unwrap();
4469        assert_eq!(column_index.len(), 1); // 1 row group
4470        assert_eq!(column_index[0].len(), 2); // 2 columns
4471
4472        let a_idx = &column_index[0][0];
4473        assert!(
4474            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4475            "{a_idx:?}"
4476        );
4477        let b_idx = &column_index[0][1];
4478        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4479    }
4480
4481    #[test]
4482    fn test_disabled_statistics_with_chunk() {
4483        let file_schema = Schema::new(vec![
4484            Field::new("a", DataType::Utf8, true),
4485            Field::new("b", DataType::Utf8, true),
4486        ]);
4487        let file_schema = Arc::new(file_schema);
4488
4489        let batch = RecordBatch::try_new(
4490            file_schema.clone(),
4491            vec![
4492                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4493                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4494            ],
4495        )
4496        .unwrap();
4497
4498        let props = WriterProperties::builder()
4499            .set_statistics_enabled(EnabledStatistics::None)
4500            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4501            .build();
4502
4503        let mut buf = Vec::with_capacity(1024);
4504        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4505        writer.write(&batch).unwrap();
4506
4507        let metadata = writer.close().unwrap();
4508        assert_eq!(metadata.num_row_groups(), 1);
4509        let row_group = metadata.row_group(0);
4510        assert_eq!(row_group.num_columns(), 2);
4511        // Column "a" should only have offset index
4512        assert!(row_group.column(0).offset_index_offset().is_some());
4513        assert!(row_group.column(0).column_index_offset().is_none());
4514        // Column "b" should only have offset index
4515        assert!(row_group.column(1).offset_index_offset().is_some());
4516        assert!(row_group.column(1).column_index_offset().is_none());
4517
4518        let options = ReadOptionsBuilder::new().with_page_index().build();
4519        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4520
4521        let row_group = reader.get_row_group(0).unwrap();
4522        let a_col = row_group.metadata().column(0);
4523        let b_col = row_group.metadata().column(1);
4524
4525        // Column chunk of column "a" should have chunk level statistics
4526        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4527            let min = byte_array_stats.min_opt().unwrap();
4528            let max = byte_array_stats.max_opt().unwrap();
4529
4530            assert_eq!(min.as_bytes(), b"a");
4531            assert_eq!(max.as_bytes(), b"d");
4532        } else {
4533            panic!("expecting Statistics::ByteArray");
4534        }
4535
4536        // The column chunk for column "b"  shouldn't have statistics
4537        assert!(b_col.statistics().is_none());
4538
4539        let column_index = reader.metadata().column_index().unwrap();
4540        assert_eq!(column_index.len(), 1); // 1 row group
4541        assert_eq!(column_index[0].len(), 2); // 2 columns
4542
4543        let a_idx = &column_index[0][0];
4544        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4545        let b_idx = &column_index[0][1];
4546        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4547    }
4548
4549    #[test]
4550    fn test_arrow_writer_skip_metadata() {
4551        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4552        let file_schema = Arc::new(batch_schema.clone());
4553
4554        let batch = RecordBatch::try_new(
4555            Arc::new(batch_schema),
4556            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4557        )
4558        .unwrap();
4559        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4560
4561        let mut buf = Vec::with_capacity(1024);
4562        let mut writer =
4563            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4564        writer.write(&batch).unwrap();
4565        writer.close().unwrap();
4566
4567        let bytes = Bytes::from(buf);
4568        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4569        assert_eq!(file_schema, *reader_builder.schema());
4570        if let Some(key_value_metadata) = reader_builder
4571            .metadata()
4572            .file_metadata()
4573            .key_value_metadata()
4574        {
4575            assert!(
4576                !key_value_metadata
4577                    .iter()
4578                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4579            );
4580        }
4581    }
4582
4583    #[test]
4584    fn mismatched_schemas() {
4585        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4586        let file_schema = Arc::new(Schema::new(vec![Field::new(
4587            "temperature",
4588            DataType::Float64,
4589            false,
4590        )]));
4591
4592        let batch = RecordBatch::try_new(
4593            Arc::new(batch_schema),
4594            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4595        )
4596        .unwrap();
4597
4598        let mut buf = Vec::with_capacity(1024);
4599        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4600
4601        let err = writer.write(&batch).unwrap_err().to_string();
4602        assert_eq!(
4603            err,
4604            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4605        );
4606    }
4607
4608    #[test]
4609    // https://github.com/apache/arrow-rs/issues/6988
4610    fn test_roundtrip_empty_schema() {
4611        // create empty record batch with empty schema
4612        let empty_batch = RecordBatch::try_new_with_options(
4613            Arc::new(Schema::empty()),
4614            vec![],
4615            &RecordBatchOptions::default().with_row_count(Some(0)),
4616        )
4617        .unwrap();
4618
4619        // write to parquet
4620        let mut parquet_bytes: Vec<u8> = Vec::new();
4621        let mut writer =
4622            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4623        writer.write(&empty_batch).unwrap();
4624        writer.close().unwrap();
4625
4626        // read from parquet
4627        let bytes = Bytes::from(parquet_bytes);
4628        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4629        assert_eq!(reader.schema(), &empty_batch.schema());
4630        let batches: Vec<_> = reader
4631            .build()
4632            .unwrap()
4633            .collect::<ArrowResult<Vec<_>>>()
4634            .unwrap();
4635        assert_eq!(batches.len(), 0);
4636    }
4637
4638    #[test]
4639    fn test_page_stats_not_written_by_default() {
4640        let string_field = Field::new("a", DataType::Utf8, false);
4641        let schema = Schema::new(vec![string_field]);
4642        let raw_string_values = vec!["Blart Versenwald III"];
4643        let string_values = StringArray::from(raw_string_values.clone());
4644        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4645
4646        let props = WriterProperties::builder()
4647            .set_statistics_enabled(EnabledStatistics::Page)
4648            .set_dictionary_enabled(false)
4649            .set_encoding(Encoding::PLAIN)
4650            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4651            .build();
4652
4653        let file = roundtrip_opts(&batch, props);
4654
4655        // read file and decode page headers
4656        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4657
4658        // decode first page header
4659        let first_page = &file[4..];
4660        let mut prot = ThriftSliceInputProtocol::new(first_page);
4661        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4662        let stats = hdr.data_page_header.unwrap().statistics;
4663
4664        assert!(stats.is_none());
4665    }
4666
4667    #[test]
4668    fn test_page_stats_when_enabled() {
4669        let string_field = Field::new("a", DataType::Utf8, false);
4670        let schema = Schema::new(vec![string_field]);
4671        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4672        let string_values = StringArray::from(raw_string_values.clone());
4673        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4674
4675        let props = WriterProperties::builder()
4676            .set_statistics_enabled(EnabledStatistics::Page)
4677            .set_dictionary_enabled(false)
4678            .set_encoding(Encoding::PLAIN)
4679            .set_write_page_header_statistics(true)
4680            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4681            .build();
4682
4683        let file = roundtrip_opts(&batch, props);
4684
4685        // read file and decode page headers
4686        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4687
4688        // decode first page header
4689        let first_page = &file[4..];
4690        let mut prot = ThriftSliceInputProtocol::new(first_page);
4691        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4692        let stats = hdr.data_page_header.unwrap().statistics;
4693
4694        let stats = stats.unwrap();
4695        // check that min/max were actually written to the page
4696        assert!(stats.is_max_value_exact.unwrap());
4697        assert!(stats.is_min_value_exact.unwrap());
4698        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4699        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4700    }
4701
4702    #[test]
4703    fn test_page_stats_truncation() {
4704        let string_field = Field::new("a", DataType::Utf8, false);
4705        let binary_field = Field::new("b", DataType::Binary, false);
4706        let schema = Schema::new(vec![string_field, binary_field]);
4707
4708        let raw_string_values = vec!["Blart Versenwald III"];
4709        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4710        let raw_binary_value_refs = raw_binary_values
4711            .iter()
4712            .map(|x| x.as_slice())
4713            .collect::<Vec<_>>();
4714
4715        let string_values = StringArray::from(raw_string_values.clone());
4716        let binary_values = BinaryArray::from(raw_binary_value_refs);
4717        let batch = RecordBatch::try_new(
4718            Arc::new(schema),
4719            vec![Arc::new(string_values), Arc::new(binary_values)],
4720        )
4721        .unwrap();
4722
4723        let props = WriterProperties::builder()
4724            .set_statistics_truncate_length(Some(2))
4725            .set_dictionary_enabled(false)
4726            .set_encoding(Encoding::PLAIN)
4727            .set_write_page_header_statistics(true)
4728            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4729            .build();
4730
4731        let file = roundtrip_opts(&batch, props);
4732
4733        // read file and decode page headers
4734        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4735
4736        // decode first page header
4737        let first_page = &file[4..];
4738        let mut prot = ThriftSliceInputProtocol::new(first_page);
4739        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4740        let stats = hdr.data_page_header.unwrap().statistics;
4741        assert!(stats.is_some());
4742        let stats = stats.unwrap();
4743        // check that min/max were properly truncated
4744        assert!(!stats.is_max_value_exact.unwrap());
4745        assert!(!stats.is_min_value_exact.unwrap());
4746        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4747        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4748
4749        // check second page now
4750        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4751        let mut prot = ThriftSliceInputProtocol::new(second_page);
4752        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4753        let stats = hdr.data_page_header.unwrap().statistics;
4754        assert!(stats.is_some());
4755        let stats = stats.unwrap();
4756        // check that min/max were properly truncated
4757        assert!(!stats.is_max_value_exact.unwrap());
4758        assert!(!stats.is_min_value_exact.unwrap());
4759        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4760        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4761    }
4762
4763    #[test]
4764    fn test_page_encoding_statistics_roundtrip() {
4765        let batch_schema = Schema::new(vec![Field::new(
4766            "int32",
4767            arrow_schema::DataType::Int32,
4768            false,
4769        )]);
4770
4771        let batch = RecordBatch::try_new(
4772            Arc::new(batch_schema.clone()),
4773            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4774        )
4775        .unwrap();
4776
4777        let mut file: File = tempfile::tempfile().unwrap();
4778        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4779        writer.write(&batch).unwrap();
4780        let file_metadata = writer.close().unwrap();
4781
4782        assert_eq!(file_metadata.num_row_groups(), 1);
4783        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4784        assert!(
4785            file_metadata
4786                .row_group(0)
4787                .column(0)
4788                .page_encoding_stats()
4789                .is_some()
4790        );
4791        let chunk_page_stats = file_metadata
4792            .row_group(0)
4793            .column(0)
4794            .page_encoding_stats()
4795            .unwrap();
4796
4797        // check that the read metadata is also correct
4798        let options = ReadOptionsBuilder::new()
4799            .with_page_index()
4800            .with_encoding_stats_as_mask(false)
4801            .build();
4802        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4803
4804        let rowgroup = reader.get_row_group(0).expect("row group missing");
4805        assert_eq!(rowgroup.num_columns(), 1);
4806        let column = rowgroup.metadata().column(0);
4807        assert!(column.page_encoding_stats().is_some());
4808        let file_page_stats = column.page_encoding_stats().unwrap();
4809        assert_eq!(chunk_page_stats, file_page_stats);
4810    }
4811
4812    #[test]
4813    fn test_different_dict_page_size_limit() {
4814        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4815        let schema = Arc::new(Schema::new(vec![
4816            Field::new("col0", arrow_schema::DataType::Int64, false),
4817            Field::new("col1", arrow_schema::DataType::Int64, false),
4818        ]));
4819        let batch =
4820            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4821
4822        let props = WriterProperties::builder()
4823            .set_dictionary_page_size_limit(1024 * 1024)
4824            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4825            .build();
4826        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4827        writer.write(&batch).unwrap();
4828        let data = Bytes::from(writer.into_inner().unwrap());
4829
4830        let mut metadata = ParquetMetaDataReader::new();
4831        metadata.try_parse(&data).unwrap();
4832        let metadata = metadata.finish().unwrap();
4833        let col0_meta = metadata.row_group(0).column(0);
4834        let col1_meta = metadata.row_group(0).column(1);
4835
4836        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4837            let mut reader =
4838                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4839            let page = reader.get_next_page().unwrap().unwrap();
4840            match page {
4841                Page::DictionaryPage { buf, .. } => buf.len(),
4842                _ => panic!("expected DictionaryPage"),
4843            }
4844        };
4845
4846        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4847        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4848    }
4849
4850    struct WriteBatchesShape {
4851        num_batches: usize,
4852        rows_per_batch: usize,
4853        row_size: usize,
4854    }
4855
4856    /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter`
4857    fn write_batches(
4858        WriteBatchesShape {
4859            num_batches,
4860            rows_per_batch,
4861            row_size,
4862        }: WriteBatchesShape,
4863        props: WriterProperties,
4864    ) -> ParquetRecordBatchReaderBuilder<File> {
4865        let schema = Arc::new(Schema::new(vec![Field::new(
4866            "str",
4867            ArrowDataType::Utf8,
4868            false,
4869        )]));
4870        let file = tempfile::tempfile().unwrap();
4871        let mut writer =
4872            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4873
4874        for batch_idx in 0..num_batches {
4875            let strings: Vec<String> = (0..rows_per_batch)
4876                .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
4877                .collect();
4878            let array = StringArray::from(strings);
4879            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4880            writer.write(&batch).unwrap();
4881        }
4882        writer.close().unwrap();
4883        ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
4884    }
4885
4886    #[test]
4887    // When both limits are None, all data should go into a single row group
4888    fn test_row_group_limit_none_writes_single_row_group() {
4889        let props = WriterProperties::builder()
4890            .set_max_row_group_row_count(None)
4891            .set_max_row_group_bytes(None)
4892            .build();
4893
4894        let builder = write_batches(
4895            WriteBatchesShape {
4896                num_batches: 1,
4897                rows_per_batch: 1000,
4898                row_size: 4,
4899            },
4900            props,
4901        );
4902
4903        assert_eq!(
4904            &row_group_sizes(builder.metadata()),
4905            &[1000],
4906            "With no limits, all rows should be in a single row group"
4907        );
4908    }
4909
4910    #[test]
4911    // When only max_row_group_size is set, respect the row limit
4912    fn test_row_group_limit_rows_only() {
4913        let props = WriterProperties::builder()
4914            .set_max_row_group_row_count(Some(300))
4915            .set_max_row_group_bytes(None)
4916            .build();
4917
4918        let builder = write_batches(
4919            WriteBatchesShape {
4920                num_batches: 1,
4921                rows_per_batch: 1000,
4922                row_size: 4,
4923            },
4924            props,
4925        );
4926
4927        assert_eq!(
4928            &row_group_sizes(builder.metadata()),
4929            &[300, 300, 300, 100],
4930            "Row groups should be split by row count"
4931        );
4932    }
4933
4934    #[test]
4935    // When only max_row_group_bytes is set, respect the byte limit
4936    fn test_row_group_limit_bytes_only() {
4937        let props = WriterProperties::builder()
4938            .set_max_row_group_row_count(None)
4939            // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
4940            .set_max_row_group_bytes(Some(3500))
4941            .build();
4942
4943        let builder = write_batches(
4944            WriteBatchesShape {
4945                num_batches: 10,
4946                rows_per_batch: 10,
4947                row_size: 100,
4948            },
4949            props,
4950        );
4951
4952        let sizes = row_group_sizes(builder.metadata());
4953
4954        assert!(
4955            sizes.len() > 1,
4956            "Should have multiple row groups due to byte limit, got {sizes:?}",
4957        );
4958
4959        let total_rows: i64 = sizes.iter().sum();
4960        assert_eq!(total_rows, 100, "Total rows should be preserved");
4961    }
4962
4963    #[test]
4964    // If an in-progress row group is already oversized, it should be flushed before writing more.
4965    fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
4966        let schema = Arc::new(Schema::new(vec![Field::new(
4967            "str",
4968            ArrowDataType::Utf8,
4969            false,
4970        )]));
4971        let file = tempfile::tempfile().unwrap();
4972
4973        // Start with no byte limit so we can intentionally build an oversized in-progress row group.
4974        let props = WriterProperties::builder()
4975            .set_max_row_group_row_count(None)
4976            .set_max_row_group_bytes(None)
4977            .build();
4978        let mut writer =
4979            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4980
4981        let first_array = StringArray::from(
4982            (0..10)
4983                .map(|i| format!("{:0>100}", i))
4984                .collect::<Vec<String>>(),
4985        );
4986        let first_batch =
4987            RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
4988        writer.write(&first_batch).unwrap();
4989        assert_eq!(writer.in_progress_rows(), 10);
4990
4991        // Tighten the limit below the current in-progress bytes to exercise:
4992        // `if current_bytes >= max_bytes { self.flush()?; ... }`
4993        writer.max_row_group_bytes = Some(1);
4994
4995        let second_array = StringArray::from(vec!["x".to_string()]);
4996        let second_batch =
4997            RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
4998        writer.write(&second_batch).unwrap();
4999        writer.close().unwrap();
5000        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5001
5002        assert_eq!(
5003            &row_group_sizes(builder.metadata()),
5004            &[10, 1],
5005            "The second write should flush an oversized in-progress row group first",
5006        );
5007    }
5008
5009    #[test]
5010    // When both limits are set, the row limit triggers first
5011    fn test_row_group_limit_both_row_wins_single_batch() {
5012        let props = WriterProperties::builder()
5013            .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows
5014            .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data
5015            .build();
5016
5017        let builder = write_batches(
5018            WriteBatchesShape {
5019                num_batches: 1,
5020                row_size: 4,
5021                rows_per_batch: 1000,
5022            },
5023            props,
5024        );
5025
5026        assert_eq!(
5027            &row_group_sizes(builder.metadata()),
5028            &[200, 200, 200, 200, 200],
5029            "Row limit should trigger before byte limit"
5030        );
5031    }
5032
5033    #[test]
5034    // When both limits are set, the row limit triggers first
5035    fn test_row_group_limit_both_row_wins_multiple_batches() {
5036        let props = WriterProperties::builder()
5037            .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows
5038            .set_max_row_group_bytes(Some(9999)) // Won't trigger
5039            .build();
5040
5041        let builder = write_batches(
5042            WriteBatchesShape {
5043                num_batches: 10,
5044                rows_per_batch: 10,
5045                row_size: 100,
5046            },
5047            props,
5048        );
5049
5050        assert_eq!(
5051            &row_group_sizes(builder.metadata()),
5052            &[5; 20],
5053            "Row limit should trigger before byte limit"
5054        );
5055    }
5056
5057    #[test]
5058    // When both limits are set, the byte limit triggers first
5059    fn test_row_group_limit_both_bytes_wins() {
5060        let props = WriterProperties::builder()
5061            .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows
5062            .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows
5063            .build();
5064
5065        let builder = write_batches(
5066            WriteBatchesShape {
5067                num_batches: 10,
5068                rows_per_batch: 10,
5069                row_size: 100,
5070            },
5071            props,
5072        );
5073
5074        let sizes = row_group_sizes(builder.metadata());
5075
5076        assert!(
5077            sizes.len() > 1,
5078            "Byte limit should trigger before row limit, got {sizes:?}",
5079        );
5080
5081        assert!(
5082            sizes.iter().all(|&s| s < 1000),
5083            "No row group should hit the row limit"
5084        );
5085
5086        let total_rows: i64 = sizes.iter().sum();
5087        assert_eq!(total_rows, 100, "Total rows should be preserved");
5088    }
5089
5090    #[test]
5091    fn arrow_column_chunk_close_mut_drops_column_index() {
5092        use crate::arrow::ArrowSchemaConverter;
5093        use crate::file::writer::SerializedFileWriter;
5094
5095        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5096        let props = Arc::new(
5097            WriterProperties::builder()
5098                .set_statistics_enabled(EnabledStatistics::Page)
5099                .build(),
5100        );
5101        let parquet_schema = ArrowSchemaConverter::new()
5102            .with_coerce_types(props.coerce_types())
5103            .convert(&schema)
5104            .unwrap();
5105
5106        let mut buf = Vec::with_capacity(1024);
5107        let mut writer =
5108            SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5109                .unwrap();
5110
5111        let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5112        let mut col_writers = factory.create_column_writers(0).unwrap();
5113        let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5114        for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5115            col_writers[0].write(&leaves).unwrap();
5116        }
5117        let mut chunk = col_writers.pop().unwrap().close().unwrap();
5118
5119        // Immutable accessor exposes the close result produced at close time.
5120        assert!(
5121            chunk.close().column_index.is_some(),
5122            "EnabledStatistics::Page should produce a column_index"
5123        );
5124
5125        // Mutable accessor lets callers drop the page-level index before append.
5126        chunk.close_mut().column_index = None;
5127        assert!(chunk.close().column_index.is_none());
5128
5129        let mut rg = writer.next_row_group().unwrap();
5130        chunk.append_to_row_group(&mut rg).unwrap();
5131        rg.close().unwrap();
5132        let file_meta = writer.close().unwrap();
5133
5134        // After dropping column_index, the resulting file records no column
5135        // index offset/length for this chunk.
5136        let cc = file_meta.row_group(0).column(0);
5137        assert!(cc.column_index_range().is_none());
5138    }
5139}