Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::iter::Peekable;
25use std::slice::Iter;
26use std::sync::{Arc, Mutex};
27use std::vec::IntoIter;
28
29use arrow_array::cast::AsArray;
30use arrow_array::types::*;
31use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
32use arrow_schema::{
33    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
34};
35
36use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
37
38use crate::arrow::ArrowSchemaConverter;
39use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::reader::{ChunkReader, Length};
53use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
54use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
55use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
56use levels::{ArrayLevels, calculate_array_levels};
57
58mod byte_array;
59mod levels;
60
61/// Encodes [`RecordBatch`] to parquet
62///
63/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
64/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
65/// flushed on close, leading the final row group in the output file to potentially
66/// contain fewer than `max_row_group_size` rows
67///
68/// # Example: Writing `RecordBatch`es
69/// ```
70/// # use std::sync::Arc;
71/// # use bytes::Bytes;
72/// # use arrow_array::{ArrayRef, Int64Array};
73/// # use arrow_array::RecordBatch;
74/// # use parquet::arrow::arrow_writer::ArrowWriter;
75/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
76/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
77/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
78///
79/// let mut buffer = Vec::new();
80/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
81/// writer.write(&to_write).unwrap();
82/// writer.close().unwrap();
83///
84/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
85/// let read = reader.next().unwrap().unwrap();
86///
87/// assert_eq!(to_write, read);
88/// ```
89///
90/// # Memory Usage and Limiting
91///
92/// The nature of Parquet requires buffering of an entire row group before it can
93/// be flushed to the underlying writer. Data is mostly buffered in its encoded
94/// form, reducing memory usage. However, some data such as dictionary keys,
95/// large strings or very nested data may still result in non-trivial memory
96/// usage.
97///
98/// See Also:
99/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
100/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
101///
102/// Call [`Self::flush`] to trigger an early flush of a row group based on a
103/// memory threshold and/or global memory pressure. However,  smaller row groups
104/// result in higher metadata overheads, and thus may worsen compression ratios
105/// and query performance.
106///
107/// ```no_run
108/// # use std::io::Write;
109/// # use arrow_array::RecordBatch;
110/// # use parquet::arrow::ArrowWriter;
111/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
112/// # let batch: RecordBatch = todo!();
113/// writer.write(&batch).unwrap();
114/// // Trigger an early flush if anticipated size exceeds 1_000_000
115/// if writer.in_progress_size() > 1_000_000 {
116///     writer.flush().unwrap();
117/// }
118/// ```
119///
120/// ## Type Support
121///
122/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
123/// Parquet types including  [`StructArray`] and [`ListArray`].
124///
125/// The following are not supported:
126///
127/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
128///
129/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
130/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
131/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
132/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
133/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
134///
135/// ## Type Compatibility
136/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
137/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
138/// value type.
139///
140/// For example, the following [`DataType`]s are all logically equivalent and can be written
141/// to the same column:
142/// * String, LargeString, StringView
143/// * Binary, LargeBinary, BinaryView
144///
145/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
146/// contain compatible values.
147/// ```
148/// # use std::sync::Arc;
149/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
150/// # use arrow_schema::{DataType, Field, Schema};
151/// # use parquet::arrow::arrow_writer::ArrowWriter;
152/// let record_batch1 = RecordBatch::try_new(
153///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
154///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
155///  )
156/// .unwrap();
157///
158/// let mut buffer = Vec::new();
159/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
160/// writer.write(&record_batch1).unwrap();
161///
162/// let record_batch2 = RecordBatch::try_new(
163///     Arc::new(Schema::new(vec![Field::new(
164///         "col",
165///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
166///          false,
167///     )])),
168///     vec![Arc::new(DictionaryArray::new(
169///          UInt8Array::from_iter_values(vec![0, 1]),
170///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
171///      ))],
172///  )
173///  .unwrap();
174///  writer.write(&record_batch2).unwrap();
175///  writer.close();
176/// ```
177pub struct ArrowWriter<W: Write> {
178    /// Underlying Parquet writer
179    writer: SerializedFileWriter<W>,
180
181    /// The in-progress row group if any
182    in_progress: Option<ArrowRowGroupWriter>,
183
184    /// A copy of the Arrow schema.
185    ///
186    /// The schema is used to verify that each record batch written has the correct schema
187    arrow_schema: SchemaRef,
188
189    /// Creates new [`ArrowRowGroupWriter`] instances as required
190    row_group_writer_factory: ArrowRowGroupWriterFactory,
191
192    /// The maximum number of rows to write to each row group, or None for unlimited
193    max_row_group_row_count: Option<usize>,
194
195    /// The maximum size in bytes for a row group, or None for unlimited
196    max_row_group_bytes: Option<usize>,
197
198    /// CDC chunkers persisted across row groups (one per leaf column).
199    cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
200}
201
202impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        let buffered_memory = self.in_progress_size();
205        f.debug_struct("ArrowWriter")
206            .field("writer", &self.writer)
207            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
208            .field("in_progress_rows", &self.in_progress_rows())
209            .field("arrow_schema", &self.arrow_schema)
210            .field("max_row_group_row_count", &self.max_row_group_row_count)
211            .field("max_row_group_bytes", &self.max_row_group_bytes)
212            .finish()
213    }
214}
215
216impl<W: Write + Send> ArrowWriter<W> {
217    /// Try to create a new Arrow writer
218    ///
219    /// The writer will fail if:
220    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
221    ///  * the Arrow schema contains unsupported datatypes such as Unions
222    pub fn try_new(
223        writer: W,
224        arrow_schema: SchemaRef,
225        props: Option<WriterProperties>,
226    ) -> Result<Self> {
227        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
228        Self::try_new_with_options(writer, arrow_schema, options)
229    }
230
231    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
232    ///
233    /// The writer will fail if:
234    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
235    ///  * the Arrow schema contains unsupported datatypes such as Unions
236    pub fn try_new_with_options(
237        writer: W,
238        arrow_schema: SchemaRef,
239        options: ArrowWriterOptions,
240    ) -> Result<Self> {
241        let mut props = options.properties;
242
243        let schema = if let Some(parquet_schema) = options.schema_descr {
244            parquet_schema.clone()
245        } else {
246            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
247            if let Some(schema_root) = &options.schema_root {
248                converter = converter.schema_root(schema_root);
249            }
250
251            converter.convert(&arrow_schema)?
252        };
253
254        if !options.skip_arrow_metadata {
255            // add serialized arrow schema
256            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
257        }
258
259        let max_row_group_row_count = props.max_row_group_row_count();
260        let max_row_group_bytes = props.max_row_group_bytes();
261
262        let props_ptr = Arc::new(props);
263        let file_writer =
264            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
265
266        let row_group_writer_factory =
267            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
268
269        let cdc_chunkers = props_ptr
270            .content_defined_chunking()
271            .map(|opts| {
272                file_writer
273                    .schema_descr()
274                    .columns()
275                    .iter()
276                    .map(|desc| ContentDefinedChunker::new(desc, opts))
277                    .collect::<Result<Vec<_>>>()
278            })
279            .transpose()?;
280
281        Ok(Self {
282            writer: file_writer,
283            in_progress: None,
284            arrow_schema,
285            row_group_writer_factory,
286            max_row_group_row_count,
287            max_row_group_bytes,
288            cdc_chunkers,
289        })
290    }
291
292    /// Returns metadata for any flushed row groups
293    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
294        self.writer.flushed_row_groups()
295    }
296
297    /// Estimated memory usage, in bytes, of this `ArrowWriter`
298    ///
299    /// This estimate is formed bu summing the values of
300    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
301    pub fn memory_size(&self) -> usize {
302        match &self.in_progress {
303            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
304            None => 0,
305        }
306    }
307
308    /// Anticipated encoded size of the in progress row group.
309    ///
310    /// This estimate the row group size after being completely encoded is,
311    /// formed by summing the values of
312    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
313    /// columns.
314    pub fn in_progress_size(&self) -> usize {
315        match &self.in_progress {
316            Some(in_progress) => in_progress
317                .writers
318                .iter()
319                .map(|x| x.get_estimated_total_bytes())
320                .sum(),
321            None => 0,
322        }
323    }
324
325    /// Returns the number of rows buffered in the in progress row group
326    pub fn in_progress_rows(&self) -> usize {
327        self.in_progress
328            .as_ref()
329            .map(|x| x.buffered_rows)
330            .unwrap_or_default()
331    }
332
333    /// Returns the number of bytes written by this instance
334    pub fn bytes_written(&self) -> usize {
335        self.writer.bytes_written()
336    }
337
338    /// Encodes the provided [`RecordBatch`]
339    ///
340    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]
341    /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be
342    /// written to one or more row groups such that limits are respected.
343    ///
344    /// If both limits are `None`, all data is written to a single row group.
345    /// If one limit is set, that limit is respected.
346    /// If both limits are set, the lower bound (whichever triggers first) is respected.
347    ///
348    /// This will fail if the `batch`'s schema does not match the writer's schema.
349    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
350        if batch.num_rows() == 0 {
351            return Ok(());
352        }
353
354        let in_progress = match &mut self.in_progress {
355            Some(in_progress) => in_progress,
356            x => x.insert(
357                self.row_group_writer_factory
358                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
359            ),
360        };
361
362        if let Some(max_rows) = self.max_row_group_row_count {
363            if in_progress.buffered_rows + batch.num_rows() > max_rows {
364                let to_write = max_rows - in_progress.buffered_rows;
365                let a = batch.slice(0, to_write);
366                let b = batch.slice(to_write, batch.num_rows() - to_write);
367                self.write(&a)?;
368                return self.write(&b);
369            }
370        }
371
372        // Check byte limit: if we have buffered data, use measured average row size
373        // to split batch proactively before exceeding byte limit
374        if let Some(max_bytes) = self.max_row_group_bytes {
375            if in_progress.buffered_rows > 0 {
376                let current_bytes = in_progress.get_estimated_total_bytes();
377
378                if current_bytes >= max_bytes {
379                    self.flush()?;
380                    return self.write(batch);
381                }
382
383                let avg_row_bytes = current_bytes / in_progress.buffered_rows;
384                if avg_row_bytes > 0 {
385                    // At this point, `current_bytes < max_bytes` (checked above)
386                    let remaining_bytes = max_bytes - current_bytes;
387                    let rows_that_fit = remaining_bytes / avg_row_bytes;
388
389                    if batch.num_rows() > rows_that_fit {
390                        if rows_that_fit > 0 {
391                            let a = batch.slice(0, rows_that_fit);
392                            let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
393                            self.write(&a)?;
394                            return self.write(&b);
395                        } else {
396                            self.flush()?;
397                            return self.write(batch);
398                        }
399                    }
400                }
401            }
402        }
403
404        match self.cdc_chunkers.as_mut() {
405            Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
406            None => in_progress.write(batch)?,
407        }
408
409        let should_flush = self
410            .max_row_group_row_count
411            .is_some_and(|max| in_progress.buffered_rows >= max)
412            || self
413                .max_row_group_bytes
414                .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
415
416        if should_flush {
417            self.flush()?
418        }
419        Ok(())
420    }
421
422    /// Writes the given buf bytes to the internal buffer.
423    ///
424    /// It's safe to use this method to write data to the underlying writer,
425    /// because it will ensure that the buffering and byte‐counting layers are used.
426    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
427        self.writer.write_all(buf)
428    }
429
430    /// Flushes underlying writer
431    pub fn sync(&mut self) -> std::io::Result<()> {
432        self.writer.flush()
433    }
434
435    /// Flushes all buffered rows into a new row group
436    ///
437    /// Note the underlying writer is not flushed with this call.
438    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
439    pub fn flush(&mut self) -> Result<()> {
440        let in_progress = match self.in_progress.take() {
441            Some(in_progress) => in_progress,
442            None => return Ok(()),
443        };
444
445        let mut row_group_writer = self.writer.next_row_group()?;
446        for chunk in in_progress.close()? {
447            chunk.append_to_row_group(&mut row_group_writer)?;
448        }
449        row_group_writer.close()?;
450        Ok(())
451    }
452
453    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
454    ///
455    /// This method provide a way to append kv_metadata after write RecordBatch
456    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
457        self.writer.append_key_value_metadata(kv_metadata)
458    }
459
460    /// Returns a reference to the underlying writer.
461    pub fn inner(&self) -> &W {
462        self.writer.inner()
463    }
464
465    /// Returns a mutable reference to the underlying writer.
466    ///
467    /// **Warning**: if you write directly to this writer, you will skip
468    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
469    /// the file footer’s recorded offsets and sizes to diverge from reality,
470    /// resulting in an unreadable or corrupted Parquet file.
471    ///
472    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
473    pub fn inner_mut(&mut self) -> &mut W {
474        self.writer.inner_mut()
475    }
476
477    /// Flushes any outstanding data and returns the underlying writer.
478    pub fn into_inner(mut self) -> Result<W> {
479        self.flush()?;
480        self.writer.into_inner()
481    }
482
483    /// Close and finalize the underlying Parquet writer
484    ///
485    /// Unlike [`Self::close`] this does not consume self
486    ///
487    /// Attempting to write after calling finish will result in an error
488    pub fn finish(&mut self) -> Result<ParquetMetaData> {
489        self.flush()?;
490        self.writer.finish()
491    }
492
493    /// Close and finalize the underlying Parquet writer
494    pub fn close(mut self) -> Result<ParquetMetaData> {
495        self.finish()
496    }
497
498    /// Create a new row group writer and return its column writers.
499    #[deprecated(
500        since = "56.2.0",
501        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
502    )]
503    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
504        self.flush()?;
505        let in_progress = self
506            .row_group_writer_factory
507            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
508        Ok(in_progress.writers)
509    }
510
511    /// Append the given column chunks to the file as a new row group.
512    #[deprecated(
513        since = "56.2.0",
514        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
515    )]
516    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
517        let mut row_group_writer = self.writer.next_row_group()?;
518        for chunk in chunks {
519            chunk.append_to_row_group(&mut row_group_writer)?;
520        }
521        row_group_writer.close()?;
522        Ok(())
523    }
524
525    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
526    ///
527    /// Flushes any outstanding data before returning.
528    ///
529    /// This can be useful to provide more control over how files are written, for example
530    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
531    pub fn into_serialized_writer(
532        mut self,
533    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
534        self.flush()?;
535        Ok((self.writer, self.row_group_writer_factory))
536    }
537}
538
539impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
540    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
541        self.write(batch).map_err(|e| e.into())
542    }
543
544    fn close(self) -> std::result::Result<(), ArrowError> {
545        self.close()?;
546        Ok(())
547    }
548}
549
550/// Arrow-specific configuration settings for writing parquet files.
551///
552/// See [`ArrowWriter`] for how to configure the writer.
553#[derive(Debug, Clone, Default)]
554pub struct ArrowWriterOptions {
555    properties: WriterProperties,
556    skip_arrow_metadata: bool,
557    schema_root: Option<String>,
558    schema_descr: Option<SchemaDescriptor>,
559}
560
561impl ArrowWriterOptions {
562    /// Creates a new [`ArrowWriterOptions`] with the default settings.
563    pub fn new() -> Self {
564        Self::default()
565    }
566
567    /// Sets the [`WriterProperties`] for writing parquet files.
568    pub fn with_properties(self, properties: WriterProperties) -> Self {
569        Self { properties, ..self }
570    }
571
572    /// Skip encoding the embedded arrow metadata (defaults to `false`)
573    ///
574    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
575    /// by default.
576    ///
577    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
578    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
579        Self {
580            skip_arrow_metadata,
581            ..self
582        }
583    }
584
585    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
586    pub fn with_schema_root(self, schema_root: String) -> Self {
587        Self {
588            schema_root: Some(schema_root),
589            ..self
590        }
591    }
592
593    /// Explicitly specify the Parquet schema to be used
594    ///
595    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
596    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
597    /// already known or must be calculated using custom logic.
598    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
599        Self {
600            schema_descr: Some(schema_descr),
601            ..self
602        }
603    }
604}
605
606/// A single column chunk produced by [`ArrowColumnWriter`]
607#[derive(Default)]
608struct ArrowColumnChunkData {
609    length: usize,
610    data: Vec<Bytes>,
611}
612
613impl Length for ArrowColumnChunkData {
614    fn len(&self) -> u64 {
615        self.length as _
616    }
617}
618
619impl ChunkReader for ArrowColumnChunkData {
620    type T = ArrowColumnChunkReader;
621
622    fn get_read(&self, start: u64) -> Result<Self::T> {
623        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
624        Ok(ArrowColumnChunkReader(
625            self.data.clone().into_iter().peekable(),
626        ))
627    }
628
629    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
630        unimplemented!()
631    }
632}
633
634/// A [`Read`] for [`ArrowColumnChunkData`]
635struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
636
637impl Read for ArrowColumnChunkReader {
638    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
639        let buffer = loop {
640            match self.0.peek_mut() {
641                Some(b) if b.is_empty() => {
642                    self.0.next();
643                    continue;
644                }
645                Some(b) => break b,
646                None => return Ok(0),
647            }
648        };
649
650        let len = buffer.len().min(out.len());
651        let b = buffer.split_to(len);
652        out[..len].copy_from_slice(&b);
653        Ok(len)
654    }
655}
656
657/// A shared [`ArrowColumnChunkData`]
658///
659/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
660/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
661type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
662
663#[derive(Default)]
664struct ArrowPageWriter {
665    buffer: SharedColumnChunk,
666    #[cfg(feature = "encryption")]
667    page_encryptor: Option<PageEncryptor>,
668}
669
670impl ArrowPageWriter {
671    #[cfg(feature = "encryption")]
672    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
673        self.page_encryptor = page_encryptor;
674        self
675    }
676
677    #[cfg(feature = "encryption")]
678    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
679        self.page_encryptor.as_mut()
680    }
681
682    #[cfg(not(feature = "encryption"))]
683    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
684        None
685    }
686}
687
688impl PageWriter for ArrowPageWriter {
689    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
690        let page = match self.page_encryptor_mut() {
691            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
692            None => page,
693        };
694
695        let page_header = page.to_thrift_header()?;
696        let header = {
697            let mut header = Vec::with_capacity(1024);
698
699            match self.page_encryptor_mut() {
700                Some(page_encryptor) => {
701                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
702                    if page.compressed_page().is_data_page() {
703                        page_encryptor.increment_page();
704                    }
705                }
706                None => {
707                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
708                    page_header.write_thrift(&mut protocol)?;
709                }
710            };
711
712            Bytes::from(header)
713        };
714
715        let mut buf = self.buffer.try_lock().unwrap();
716
717        let data = page.compressed_page().buffer().clone();
718        let compressed_size = data.len() + header.len();
719
720        let mut spec = PageWriteSpec::new();
721        spec.page_type = page.page_type();
722        spec.num_values = page.num_values();
723        spec.uncompressed_size = page.uncompressed_size() + header.len();
724        spec.offset = buf.length as u64;
725        spec.compressed_size = compressed_size;
726        spec.bytes_written = compressed_size as u64;
727
728        buf.length += compressed_size;
729        buf.data.push(header);
730        buf.data.push(data);
731
732        Ok(spec)
733    }
734
735    fn close(&mut self) -> Result<()> {
736        Ok(())
737    }
738}
739
740/// A leaf column that can be encoded by [`ArrowColumnWriter`]
741#[derive(Debug)]
742pub struct ArrowLeafColumn(ArrayLevels);
743
744/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
745///
746/// This function can be used along with [`get_column_writers`] to encode
747/// individual columns in parallel. See example on [`ArrowColumnWriter`]
748pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
749    let levels = calculate_array_levels(array, field)?;
750    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
751}
752
753/// The data for a single column chunk, see [`ArrowColumnWriter`]
754pub struct ArrowColumnChunk {
755    data: ArrowColumnChunkData,
756    close: ColumnCloseResult,
757}
758
759impl std::fmt::Debug for ArrowColumnChunk {
760    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761        f.debug_struct("ArrowColumnChunk")
762            .field("length", &self.data.length)
763            .finish_non_exhaustive()
764    }
765}
766
767impl ArrowColumnChunk {
768    /// Returns the [`ColumnCloseResult`] produced when the chunk was closed.
769    ///
770    /// Exposes encoding information, collected statistics, and the optional
771    /// [`ColumnIndexMetaData`](crate::file::page_index::column_index::ColumnIndexMetaData)
772    /// / [`OffsetIndexMetaData`](crate::file::page_index::offset_index::OffsetIndexMetaData)
773    /// gathered for the column chunk.
774    pub fn close(&self) -> &ColumnCloseResult {
775        &self.close
776    }
777
778    /// Returns a mutable reference to the [`ColumnCloseResult`].
779    ///
780    /// This allows callers to mutate the close result before the chunk is
781    /// appended to a row group — for example, clearing `column_index` or
782    /// `bloom_filter` based on a dynamic rule that inspects the encodings and
783    /// collected page statistics.
784    pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
785        &mut self.close
786    }
787
788    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
789    pub fn append_to_row_group<W: Write + Send>(
790        self,
791        writer: &mut SerializedRowGroupWriter<'_, W>,
792    ) -> Result<()> {
793        writer.append_column(&self.data, self.close)
794    }
795}
796
797/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
798///
799/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
800///
801/// Note: This is a low-level interface for applications that require
802/// fine-grained control of encoding (e.g. encoding using multiple threads),
803/// see [`ArrowWriter`] for a higher-level interface
804///
805/// # Example: Encoding two Arrow Array's in Parallel
806/// ```
807/// // The arrow schema
808/// # use std::sync::Arc;
809/// # use arrow_array::*;
810/// # use arrow_schema::*;
811/// # use parquet::arrow::ArrowSchemaConverter;
812/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
813/// # use parquet::file::properties::WriterProperties;
814/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
815/// #
816/// let schema = Arc::new(Schema::new(vec![
817///     Field::new("i32", DataType::Int32, false),
818///     Field::new("f32", DataType::Float32, false),
819/// ]));
820///
821/// // Compute the parquet schema
822/// let props = Arc::new(WriterProperties::default());
823/// let parquet_schema = ArrowSchemaConverter::new()
824///   .with_coerce_types(props.coerce_types())
825///   .convert(&schema)
826///   .unwrap();
827///
828/// // Create parquet writer
829/// let root_schema = parquet_schema.root_schema_ptr();
830/// // write to memory in the example, but this could be a File
831/// let mut out = Vec::with_capacity(1024);
832/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
833///   .unwrap();
834///
835/// // Create a factory for building Arrow column writers
836/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
837/// // Create column writers for the 0th row group
838/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
839///
840/// // Spawn a worker thread for each column
841/// //
842/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
843/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
844/// let mut workers: Vec<_> = col_writers
845///     .into_iter()
846///     .map(|mut col_writer| {
847///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
848///         let handle = std::thread::spawn(move || {
849///             // receive Arrays to encode via the channel
850///             for col in recv {
851///                 col_writer.write(&col)?;
852///             }
853///             // once the input is complete, close the writer
854///             // to return the newly created ArrowColumnChunk
855///             col_writer.close()
856///         });
857///         (handle, send)
858///     })
859///     .collect();
860///
861/// // Start row group
862/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
863///   .next_row_group()
864///   .unwrap();
865///
866/// // Create some example input columns to encode
867/// let to_write = vec![
868///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
869///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
870/// ];
871///
872/// // Send the input columns to the workers
873/// let mut worker_iter = workers.iter_mut();
874/// for (arr, field) in to_write.iter().zip(&schema.fields) {
875///     for leaves in compute_leaves(field, arr).unwrap() {
876///         worker_iter.next().unwrap().1.send(leaves).unwrap();
877///     }
878/// }
879///
880/// // Wait for the workers to complete encoding, and append
881/// // the resulting column chunks to the row group (and the file)
882/// for (handle, send) in workers {
883///     drop(send); // Drop send side to signal termination
884///     // wait for the worker to send the completed chunk
885///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
886///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
887/// }
888/// // Close the row group which writes to the underlying file
889/// row_group_writer.close().unwrap();
890///
891/// let metadata = writer.close().unwrap();
892/// assert_eq!(metadata.file_metadata().num_rows(), 3);
893/// ```
894pub struct ArrowColumnWriter {
895    writer: ArrowColumnWriterImpl,
896    chunk: SharedColumnChunk,
897}
898
899impl std::fmt::Debug for ArrowColumnWriter {
900    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
901        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
902    }
903}
904
905enum ArrowColumnWriterImpl {
906    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
907    Column(ColumnWriter<'static>),
908}
909
910impl ArrowColumnWriter {
911    /// Write an [`ArrowLeafColumn`]
912    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
913        self.write_internal(&col.0)
914    }
915
916    /// Write with content-defined chunking, inserting page flushes at chunk boundaries.
917    fn write_with_chunker(
918        &mut self,
919        col: &ArrowLeafColumn,
920        chunker: &mut ContentDefinedChunker,
921    ) -> Result<()> {
922        let levels = &col.0;
923        let chunks = chunker.get_arrow_chunks(
924            levels.def_level_data().as_ref(),
925            levels.rep_level_data().as_ref(),
926            levels.array(),
927        )?;
928
929        let num_chunks = chunks.len();
930        for (i, chunk) in chunks.iter().enumerate() {
931            let chunk_levels = levels.slice_for_chunk(chunk);
932            self.write_internal(&chunk_levels)?;
933
934            // Add a page break after each chunk except the last
935            if i + 1 < num_chunks {
936                match &mut self.writer {
937                    ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
938                    ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
939                }
940            }
941        }
942        Ok(())
943    }
944
945    fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
946        match &mut self.writer {
947            ArrowColumnWriterImpl::Column(c) => {
948                let leaf = levels.array();
949                match leaf.as_any_dictionary_opt() {
950                    Some(dictionary) => {
951                        let materialized =
952                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
953                        write_leaf(c, &materialized, levels)?
954                    }
955                    None => write_leaf(c, leaf, levels)?,
956                };
957            }
958            ArrowColumnWriterImpl::ByteArray(c) => {
959                write_primitive(c, levels.array().as_ref(), levels)?;
960            }
961        }
962        Ok(())
963    }
964
965    /// Close this column returning the written [`ArrowColumnChunk`]
966    pub fn close(self) -> Result<ArrowColumnChunk> {
967        let close = match self.writer {
968            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
969            ArrowColumnWriterImpl::Column(c) => c.close()?,
970        };
971        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
972        let data = chunk.into_inner().unwrap();
973        Ok(ArrowColumnChunk { data, close })
974    }
975
976    /// Returns the estimated total memory usage by the writer.
977    ///
978    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
979    /// of the current memory usage and not it's anticipated encoded size.
980    ///
981    /// This includes:
982    /// 1. Data buffered in encoded form
983    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
984    ///
985    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
986    pub fn memory_size(&self) -> usize {
987        match &self.writer {
988            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
989            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
990        }
991    }
992
993    /// Returns the estimated total encoded bytes for this column writer.
994    ///
995    /// This includes:
996    /// 1. Data buffered in encoded form
997    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
998    ///
999    /// This value should be less than or equal to [`Self::memory_size`]
1000    pub fn get_estimated_total_bytes(&self) -> usize {
1001        match &self.writer {
1002            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1003            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1004        }
1005    }
1006}
1007
1008/// Encodes [`RecordBatch`] to a parquet row group
1009///
1010/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
1011/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
1012///
1013/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1014#[derive(Debug)]
1015struct ArrowRowGroupWriter {
1016    writers: Vec<ArrowColumnWriter>,
1017    schema: SchemaRef,
1018    buffered_rows: usize,
1019}
1020
1021impl ArrowRowGroupWriter {
1022    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1023        Self {
1024            writers,
1025            schema: arrow.clone(),
1026            buffered_rows: 0,
1027        }
1028    }
1029
1030    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1031        self.buffered_rows += batch.num_rows();
1032        let mut writers = self.writers.iter_mut();
1033        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1034            for leaf in compute_leaves(field.as_ref(), column)? {
1035                writers.next().unwrap().write(&leaf)?;
1036            }
1037        }
1038        Ok(())
1039    }
1040
1041    fn write_with_chunkers(
1042        &mut self,
1043        batch: &RecordBatch,
1044        chunkers: &mut [ContentDefinedChunker],
1045    ) -> Result<()> {
1046        self.buffered_rows += batch.num_rows();
1047        let mut writers = self.writers.iter_mut();
1048        let mut chunkers = chunkers.iter_mut();
1049        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1050            for leaf in compute_leaves(field.as_ref(), column)? {
1051                writers
1052                    .next()
1053                    .unwrap()
1054                    .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1055            }
1056        }
1057        Ok(())
1058    }
1059
1060    /// Returns the estimated total encoded bytes for this row group
1061    fn get_estimated_total_bytes(&self) -> usize {
1062        self.writers
1063            .iter()
1064            .map(|x| x.get_estimated_total_bytes())
1065            .sum()
1066    }
1067
1068    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1069        self.writers
1070            .into_iter()
1071            .map(|writer| writer.close())
1072            .collect()
1073    }
1074}
1075
1076/// Factory that creates new column writers for each row group in the Parquet file.
1077///
1078/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
1079/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1080#[derive(Debug)]
1081pub struct ArrowRowGroupWriterFactory {
1082    schema: SchemaDescPtr,
1083    arrow_schema: SchemaRef,
1084    props: WriterPropertiesPtr,
1085    #[cfg(feature = "encryption")]
1086    file_encryptor: Option<Arc<FileEncryptor>>,
1087}
1088
1089impl ArrowRowGroupWriterFactory {
1090    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
1091    pub fn new<W: Write + Send>(
1092        file_writer: &SerializedFileWriter<W>,
1093        arrow_schema: SchemaRef,
1094    ) -> Self {
1095        let schema = Arc::clone(file_writer.schema_descr_ptr());
1096        let props = Arc::clone(file_writer.properties());
1097        Self {
1098            schema,
1099            arrow_schema,
1100            props,
1101            #[cfg(feature = "encryption")]
1102            file_encryptor: file_writer.file_encryptor(),
1103        }
1104    }
1105
1106    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1107        let writers = self.create_column_writers(row_group_index)?;
1108        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1109    }
1110
1111    /// Create column writers for a new row group, with the given row group index
1112    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1113        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1114        let mut leaves = self.schema.columns().iter();
1115        let column_factory = self.column_writer_factory(row_group_index);
1116        for field in &self.arrow_schema.fields {
1117            column_factory.get_arrow_column_writer(
1118                field.data_type(),
1119                &self.props,
1120                &mut leaves,
1121                &mut writers,
1122            )?;
1123        }
1124        Ok(writers)
1125    }
1126
1127    #[cfg(feature = "encryption")]
1128    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1129        ArrowColumnWriterFactory::new()
1130            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1131    }
1132
1133    #[cfg(not(feature = "encryption"))]
1134    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1135        ArrowColumnWriterFactory::new()
1136    }
1137}
1138
1139/// Returns [`ArrowColumnWriter`]s for each column in a given schema
1140#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1141pub fn get_column_writers(
1142    parquet: &SchemaDescriptor,
1143    props: &WriterPropertiesPtr,
1144    arrow: &SchemaRef,
1145) -> Result<Vec<ArrowColumnWriter>> {
1146    let mut writers = Vec::with_capacity(arrow.fields.len());
1147    let mut leaves = parquet.columns().iter();
1148    let column_factory = ArrowColumnWriterFactory::new();
1149    for field in &arrow.fields {
1150        column_factory.get_arrow_column_writer(
1151            field.data_type(),
1152            props,
1153            &mut leaves,
1154            &mut writers,
1155        )?;
1156    }
1157    Ok(writers)
1158}
1159
1160/// Creates [`ArrowColumnWriter`] instances
1161struct ArrowColumnWriterFactory {
1162    #[cfg(feature = "encryption")]
1163    row_group_index: usize,
1164    #[cfg(feature = "encryption")]
1165    file_encryptor: Option<Arc<FileEncryptor>>,
1166}
1167
1168impl ArrowColumnWriterFactory {
1169    pub fn new() -> Self {
1170        Self {
1171            #[cfg(feature = "encryption")]
1172            row_group_index: 0,
1173            #[cfg(feature = "encryption")]
1174            file_encryptor: None,
1175        }
1176    }
1177
1178    #[cfg(feature = "encryption")]
1179    pub fn with_file_encryptor(
1180        mut self,
1181        row_group_index: usize,
1182        file_encryptor: Option<Arc<FileEncryptor>>,
1183    ) -> Self {
1184        self.row_group_index = row_group_index;
1185        self.file_encryptor = file_encryptor;
1186        self
1187    }
1188
1189    #[cfg(feature = "encryption")]
1190    fn create_page_writer(
1191        &self,
1192        column_descriptor: &ColumnDescPtr,
1193        column_index: usize,
1194    ) -> Result<Box<ArrowPageWriter>> {
1195        let column_path = column_descriptor.path().string();
1196        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1197            &self.file_encryptor,
1198            self.row_group_index,
1199            column_index,
1200            &column_path,
1201        )?;
1202        Ok(Box::new(
1203            ArrowPageWriter::default().with_encryptor(page_encryptor),
1204        ))
1205    }
1206
1207    #[cfg(not(feature = "encryption"))]
1208    fn create_page_writer(
1209        &self,
1210        _column_descriptor: &ColumnDescPtr,
1211        _column_index: usize,
1212    ) -> Result<Box<ArrowPageWriter>> {
1213        Ok(Box::<ArrowPageWriter>::default())
1214    }
1215
1216    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1217    /// output ColumnDesc to `leaves` and the column writers to `out`
1218    fn get_arrow_column_writer(
1219        &self,
1220        data_type: &ArrowDataType,
1221        props: &WriterPropertiesPtr,
1222        leaves: &mut Iter<'_, ColumnDescPtr>,
1223        out: &mut Vec<ArrowColumnWriter>,
1224    ) -> Result<()> {
1225        // Instantiate writers for normal columns
1226        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1227            let page_writer = self.create_page_writer(desc, out.len())?;
1228            let chunk = page_writer.buffer.clone();
1229            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1230            Ok(ArrowColumnWriter {
1231                chunk,
1232                writer: ArrowColumnWriterImpl::Column(writer),
1233            })
1234        };
1235
1236        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1237        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1238            let page_writer = self.create_page_writer(desc, out.len())?;
1239            let chunk = page_writer.buffer.clone();
1240            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1241            Ok(ArrowColumnWriter {
1242                chunk,
1243                writer: ArrowColumnWriterImpl::ByteArray(writer),
1244            })
1245        };
1246
1247        match data_type {
1248            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1249            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1250                out.push(col(leaves.next().unwrap())?)
1251            }
1252            ArrowDataType::LargeBinary
1253            | ArrowDataType::Binary
1254            | ArrowDataType::Utf8
1255            | ArrowDataType::LargeUtf8
1256            | ArrowDataType::BinaryView
1257            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1258            ArrowDataType::List(f)
1259            | ArrowDataType::LargeList(f)
1260            | ArrowDataType::FixedSizeList(f, _)
1261            | ArrowDataType::ListView(f)
1262            | ArrowDataType::LargeListView(f) => {
1263                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1264            }
1265            ArrowDataType::Struct(fields) => {
1266                for field in fields {
1267                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1268                }
1269            }
1270            ArrowDataType::Map(f, _) => match f.data_type() {
1271                ArrowDataType::Struct(f) => {
1272                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1273                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1274                }
1275                _ => unreachable!("invalid map type"),
1276            },
1277            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1278                ArrowDataType::Utf8
1279                | ArrowDataType::LargeUtf8
1280                | ArrowDataType::Binary
1281                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1282                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1283                    out.push(bytes(leaves.next().unwrap())?)
1284                }
1285                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1286                _ => out.push(col(leaves.next().unwrap())?),
1287            },
1288            _ => {
1289                return Err(ParquetError::NYI(format!(
1290                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1291                )));
1292            }
1293        }
1294        Ok(())
1295    }
1296}
1297
1298fn write_leaf(
1299    writer: &mut ColumnWriter<'_>,
1300    column: &dyn arrow_array::Array,
1301    levels: &ArrayLevels,
1302) -> Result<usize> {
1303    let indices = levels.non_null_indices();
1304
1305    match writer {
1306        // Note: this should match the contents of arrow_to_parquet_type
1307        ColumnWriter::Int32ColumnWriter(typed) => {
1308            match column.data_type() {
1309                ArrowDataType::Null => {
1310                    let array = Int32Array::new_null(column.len());
1311                    write_primitive(typed, array.values(), levels)
1312                }
1313                ArrowDataType::Int8 => {
1314                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1315                    write_primitive(typed, array.values(), levels)
1316                }
1317                ArrowDataType::Int16 => {
1318                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1319                    write_primitive(typed, array.values(), levels)
1320                }
1321                ArrowDataType::Int32 => {
1322                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1323                }
1324                ArrowDataType::UInt8 => {
1325                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1326                    write_primitive(typed, array.values(), levels)
1327                }
1328                ArrowDataType::UInt16 => {
1329                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1330                    write_primitive(typed, array.values(), levels)
1331                }
1332                ArrowDataType::UInt32 => {
1333                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1334                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1335                    let array = column.as_primitive::<UInt32Type>();
1336                    write_primitive(typed, array.values().inner().typed_data(), levels)
1337                }
1338                ArrowDataType::Date32 => {
1339                    let array = column.as_primitive::<Date32Type>();
1340                    write_primitive(typed, array.values(), levels)
1341                }
1342                ArrowDataType::Time32(TimeUnit::Second) => {
1343                    let array = column.as_primitive::<Time32SecondType>();
1344                    write_primitive(typed, array.values(), levels)
1345                }
1346                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1347                    let array = column.as_primitive::<Time32MillisecondType>();
1348                    write_primitive(typed, array.values(), levels)
1349                }
1350                ArrowDataType::Date64 => {
1351                    // If the column is a Date64, we truncate it
1352                    let array: Int32Array = column
1353                        .as_primitive::<Date64Type>()
1354                        .unary(|x| (x / 86_400_000) as _);
1355
1356                    write_primitive(typed, array.values(), levels)
1357                }
1358                ArrowDataType::Decimal32(_, _) => {
1359                    let array = column
1360                        .as_primitive::<Decimal32Type>()
1361                        .unary::<_, Int32Type>(|v| v);
1362                    write_primitive(typed, array.values(), levels)
1363                }
1364                ArrowDataType::Decimal64(_, _) => {
1365                    // use the int32 to represent the decimal with low precision
1366                    let array = column
1367                        .as_primitive::<Decimal64Type>()
1368                        .unary::<_, Int32Type>(|v| v as i32);
1369                    write_primitive(typed, array.values(), levels)
1370                }
1371                ArrowDataType::Decimal128(_, _) => {
1372                    // use the int32 to represent the decimal with low precision
1373                    let array = column
1374                        .as_primitive::<Decimal128Type>()
1375                        .unary::<_, Int32Type>(|v| v as i32);
1376                    write_primitive(typed, array.values(), levels)
1377                }
1378                ArrowDataType::Decimal256(_, _) => {
1379                    // use the int32 to represent the decimal with low precision
1380                    let array = column
1381                        .as_primitive::<Decimal256Type>()
1382                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1383                    write_primitive(typed, array.values(), levels)
1384                }
1385                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1386            }
1387        }
1388        ColumnWriter::BoolColumnWriter(typed) => {
1389            let array = column.as_boolean();
1390            let values = get_bool_array_slice(array, indices.iter().copied());
1391            typed.write_batch_internal(
1392                values.as_slice(),
1393                None,
1394                levels.def_level_data().as_ref(),
1395                levels.rep_level_data().as_ref(),
1396                None,
1397                None,
1398                None,
1399            )
1400        }
1401        ColumnWriter::Int64ColumnWriter(typed) => {
1402            match column.data_type() {
1403                ArrowDataType::Date64 => {
1404                    let array = column
1405                        .as_primitive::<Date64Type>()
1406                        .reinterpret_cast::<Int64Type>();
1407
1408                    write_primitive(typed, array.values(), levels)
1409                }
1410                ArrowDataType::Int64 => {
1411                    let array = column.as_primitive::<Int64Type>();
1412                    write_primitive(typed, array.values(), levels)
1413                }
1414                ArrowDataType::UInt64 => {
1415                    let values = column.as_primitive::<UInt64Type>().values();
1416                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1417                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1418                    let array = values.inner().typed_data::<i64>();
1419                    write_primitive(typed, array, levels)
1420                }
1421                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1422                    let array = column.as_primitive::<Time64MicrosecondType>();
1423                    write_primitive(typed, array.values(), levels)
1424                }
1425                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1426                    let array = column.as_primitive::<Time64NanosecondType>();
1427                    write_primitive(typed, array.values(), levels)
1428                }
1429                ArrowDataType::Timestamp(unit, _) => match unit {
1430                    TimeUnit::Second => {
1431                        let array = column.as_primitive::<TimestampSecondType>();
1432                        write_primitive(typed, array.values(), levels)
1433                    }
1434                    TimeUnit::Millisecond => {
1435                        let array = column.as_primitive::<TimestampMillisecondType>();
1436                        write_primitive(typed, array.values(), levels)
1437                    }
1438                    TimeUnit::Microsecond => {
1439                        let array = column.as_primitive::<TimestampMicrosecondType>();
1440                        write_primitive(typed, array.values(), levels)
1441                    }
1442                    TimeUnit::Nanosecond => {
1443                        let array = column.as_primitive::<TimestampNanosecondType>();
1444                        write_primitive(typed, array.values(), levels)
1445                    }
1446                },
1447                ArrowDataType::Duration(unit) => match unit {
1448                    TimeUnit::Second => {
1449                        let array = column.as_primitive::<DurationSecondType>();
1450                        write_primitive(typed, array.values(), levels)
1451                    }
1452                    TimeUnit::Millisecond => {
1453                        let array = column.as_primitive::<DurationMillisecondType>();
1454                        write_primitive(typed, array.values(), levels)
1455                    }
1456                    TimeUnit::Microsecond => {
1457                        let array = column.as_primitive::<DurationMicrosecondType>();
1458                        write_primitive(typed, array.values(), levels)
1459                    }
1460                    TimeUnit::Nanosecond => {
1461                        let array = column.as_primitive::<DurationNanosecondType>();
1462                        write_primitive(typed, array.values(), levels)
1463                    }
1464                },
1465                ArrowDataType::Decimal64(_, _) => {
1466                    let array = column
1467                        .as_primitive::<Decimal64Type>()
1468                        .reinterpret_cast::<Int64Type>();
1469                    write_primitive(typed, array.values(), levels)
1470                }
1471                ArrowDataType::Decimal128(_, _) => {
1472                    // use the int64 to represent the decimal with low precision
1473                    let array = column
1474                        .as_primitive::<Decimal128Type>()
1475                        .unary::<_, Int64Type>(|v| v as i64);
1476                    write_primitive(typed, array.values(), levels)
1477                }
1478                ArrowDataType::Decimal256(_, _) => {
1479                    // use the int64 to represent the decimal with low precision
1480                    let array = column
1481                        .as_primitive::<Decimal256Type>()
1482                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1483                    write_primitive(typed, array.values(), levels)
1484                }
1485                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1486            }
1487        }
1488        ColumnWriter::Int96ColumnWriter(_typed) => {
1489            unreachable!("Currently unreachable because data type not supported")
1490        }
1491        ColumnWriter::FloatColumnWriter(typed) => {
1492            let array = column.as_primitive::<Float32Type>();
1493            write_primitive(typed, array.values(), levels)
1494        }
1495        ColumnWriter::DoubleColumnWriter(typed) => {
1496            let array = column.as_primitive::<Float64Type>();
1497            write_primitive(typed, array.values(), levels)
1498        }
1499        ColumnWriter::ByteArrayColumnWriter(_) => {
1500            unreachable!("should use ByteArrayWriter")
1501        }
1502        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1503            let bytes = match column.data_type() {
1504                ArrowDataType::Interval(interval_unit) => match interval_unit {
1505                    IntervalUnit::YearMonth => {
1506                        let array = column.as_primitive::<IntervalYearMonthType>();
1507                        get_interval_ym_array_slice(array, indices.iter().copied())
1508                    }
1509                    IntervalUnit::DayTime => {
1510                        let array = column.as_primitive::<IntervalDayTimeType>();
1511                        get_interval_dt_array_slice(array, indices.iter().copied())
1512                    }
1513                    _ => {
1514                        return Err(ParquetError::NYI(format!(
1515                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1516                        )));
1517                    }
1518                },
1519                ArrowDataType::FixedSizeBinary(_) => {
1520                    let array = column.as_fixed_size_binary();
1521                    get_fsb_array_slice(array, indices.iter().copied())
1522                }
1523                ArrowDataType::Decimal32(_, _) => {
1524                    let array = column.as_primitive::<Decimal32Type>();
1525                    get_decimal_32_array_slice(array, indices.iter().copied())
1526                }
1527                ArrowDataType::Decimal64(_, _) => {
1528                    let array = column.as_primitive::<Decimal64Type>();
1529                    get_decimal_64_array_slice(array, indices.iter().copied())
1530                }
1531                ArrowDataType::Decimal128(_, _) => {
1532                    let array = column.as_primitive::<Decimal128Type>();
1533                    get_decimal_128_array_slice(array, indices.iter().copied())
1534                }
1535                ArrowDataType::Decimal256(_, _) => {
1536                    let array = column.as_primitive::<Decimal256Type>();
1537                    get_decimal_256_array_slice(array, indices.iter().copied())
1538                }
1539                ArrowDataType::Float16 => {
1540                    let array = column.as_primitive::<Float16Type>();
1541                    get_float_16_array_slice(array, indices.iter().copied())
1542                }
1543                _ => {
1544                    return Err(ParquetError::NYI(
1545                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1546                    ));
1547                }
1548            };
1549            typed.write_batch_internal(
1550                bytes.as_slice(),
1551                None,
1552                levels.def_level_data().as_ref(),
1553                levels.rep_level_data().as_ref(),
1554                None,
1555                None,
1556                None,
1557            )
1558        }
1559    }
1560}
1561
1562fn write_primitive<E: ColumnValueEncoder>(
1563    writer: &mut GenericColumnWriter<E>,
1564    values: &E::Values,
1565    levels: &ArrayLevels,
1566) -> Result<usize> {
1567    writer.write_batch_internal(
1568        values,
1569        Some(levels.non_null_indices()),
1570        levels.def_level_data().as_ref(),
1571        levels.rep_level_data().as_ref(),
1572        None,
1573        None,
1574        None,
1575    )
1576}
1577
1578fn get_bool_array_slice(
1579    array: &arrow_array::BooleanArray,
1580    indices: impl ExactSizeIterator<Item = usize>,
1581) -> Vec<bool> {
1582    let mut values = Vec::with_capacity(indices.len());
1583    for i in indices {
1584        values.push(array.value(i))
1585    }
1586    values
1587}
1588
1589/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1590/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1591fn get_interval_ym_array_slice(
1592    array: &arrow_array::IntervalYearMonthArray,
1593    indices: impl ExactSizeIterator<Item = usize>,
1594) -> Vec<FixedLenByteArray> {
1595    let mut values = Vec::with_capacity(indices.len());
1596    for i in indices {
1597        let mut value = array.value(i).to_le_bytes().to_vec();
1598        let mut suffix = vec![0; 8];
1599        value.append(&mut suffix);
1600        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1601    }
1602    values
1603}
1604
1605/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1606/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1607fn get_interval_dt_array_slice(
1608    array: &arrow_array::IntervalDayTimeArray,
1609    indices: impl ExactSizeIterator<Item = usize>,
1610) -> Vec<FixedLenByteArray> {
1611    let mut values = Vec::with_capacity(indices.len());
1612    for i in indices {
1613        let mut out = [0; 12];
1614        let value = array.value(i);
1615        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1616        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1617        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1618    }
1619    values
1620}
1621
1622fn get_decimal_32_array_slice(
1623    array: &arrow_array::Decimal32Array,
1624    indices: impl ExactSizeIterator<Item = usize>,
1625) -> Vec<FixedLenByteArray> {
1626    let mut values = Vec::with_capacity(indices.len());
1627    let size = decimal_length_from_precision(array.precision());
1628    for i in indices {
1629        let as_be_bytes = array.value(i).to_be_bytes();
1630        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1631        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1632    }
1633    values
1634}
1635
1636fn get_decimal_64_array_slice(
1637    array: &arrow_array::Decimal64Array,
1638    indices: impl ExactSizeIterator<Item = usize>,
1639) -> Vec<FixedLenByteArray> {
1640    let mut values = Vec::with_capacity(indices.len());
1641    let size = decimal_length_from_precision(array.precision());
1642    for i in indices {
1643        let as_be_bytes = array.value(i).to_be_bytes();
1644        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1645        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1646    }
1647    values
1648}
1649
1650fn get_decimal_128_array_slice(
1651    array: &arrow_array::Decimal128Array,
1652    indices: impl ExactSizeIterator<Item = usize>,
1653) -> Vec<FixedLenByteArray> {
1654    let mut values = Vec::with_capacity(indices.len());
1655    let size = decimal_length_from_precision(array.precision());
1656    for i in indices {
1657        let as_be_bytes = array.value(i).to_be_bytes();
1658        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1659        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1660    }
1661    values
1662}
1663
1664fn get_decimal_256_array_slice(
1665    array: &arrow_array::Decimal256Array,
1666    indices: impl ExactSizeIterator<Item = usize>,
1667) -> Vec<FixedLenByteArray> {
1668    let mut values = Vec::with_capacity(indices.len());
1669    let size = decimal_length_from_precision(array.precision());
1670    for i in indices {
1671        let as_be_bytes = array.value(i).to_be_bytes();
1672        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1673        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1674    }
1675    values
1676}
1677
1678fn get_float_16_array_slice(
1679    array: &arrow_array::Float16Array,
1680    indices: impl ExactSizeIterator<Item = usize>,
1681) -> Vec<FixedLenByteArray> {
1682    let mut values = Vec::with_capacity(indices.len());
1683    for i in indices {
1684        let value = array.value(i).to_le_bytes().to_vec();
1685        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1686    }
1687    values
1688}
1689
1690fn get_fsb_array_slice(
1691    array: &arrow_array::FixedSizeBinaryArray,
1692    indices: impl ExactSizeIterator<Item = usize>,
1693) -> Vec<FixedLenByteArray> {
1694    let mut values = Vec::with_capacity(indices.len());
1695    for i in indices {
1696        let value = array.value(i).to_vec();
1697        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1698    }
1699    values
1700}
1701
1702#[cfg(test)]
1703mod tests {
1704    use super::*;
1705    use std::collections::HashMap;
1706
1707    use std::fs::File;
1708
1709    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1710    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1711    use crate::column::page::{Page, PageReader};
1712    use crate::file::metadata::thrift::PageHeader;
1713    use crate::file::page_index::column_index::ColumnIndexMetaData;
1714    use crate::file::reader::SerializedPageReader;
1715    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1716    use crate::schema::types::ColumnPath;
1717    use arrow::datatypes::ToByteSlice;
1718    use arrow::datatypes::{DataType, Schema};
1719    use arrow::error::Result as ArrowResult;
1720    use arrow::util::data_gen::create_random_array;
1721    use arrow::util::pretty::pretty_format_batches;
1722    use arrow::{array::*, buffer::Buffer};
1723    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1724    use arrow_schema::Fields;
1725    use half::f16;
1726    use num_traits::{FromPrimitive, ToPrimitive};
1727    use tempfile::tempfile;
1728
1729    use crate::basic::Encoding;
1730    use crate::data_type::AsBytes;
1731    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1732    use crate::file::properties::{
1733        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1734    };
1735    use crate::file::serialized_reader::ReadOptionsBuilder;
1736    use crate::file::{
1737        reader::{FileReader, SerializedFileReader},
1738        statistics::Statistics,
1739    };
1740
1741    #[test]
1742    fn arrow_writer() {
1743        // define schema
1744        let schema = Schema::new(vec![
1745            Field::new("a", DataType::Int32, false),
1746            Field::new("b", DataType::Int32, true),
1747        ]);
1748
1749        // create some data
1750        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1751        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1752
1753        // build a record batch
1754        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1755
1756        roundtrip(batch, Some(SMALL_SIZE / 2));
1757    }
1758
1759    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1760        let mut buffer = vec![];
1761
1762        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1763        writer.write(expected_batch).unwrap();
1764        writer.close().unwrap();
1765
1766        buffer
1767    }
1768
1769    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1770        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1771        writer.write(expected_batch).unwrap();
1772        writer.into_inner().unwrap()
1773    }
1774
1775    #[test]
1776    fn roundtrip_bytes() {
1777        // define schema
1778        let schema = Arc::new(Schema::new(vec![
1779            Field::new("a", DataType::Int32, false),
1780            Field::new("b", DataType::Int32, true),
1781        ]));
1782
1783        // create some data
1784        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1785        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1786
1787        // build a record batch
1788        let expected_batch =
1789            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1790
1791        for buffer in [
1792            get_bytes_after_close(schema.clone(), &expected_batch),
1793            get_bytes_by_into_inner(schema, &expected_batch),
1794        ] {
1795            let cursor = Bytes::from(buffer);
1796            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1797
1798            let actual_batch = record_batch_reader
1799                .next()
1800                .expect("No batch found")
1801                .expect("Unable to get batch");
1802
1803            assert_eq!(expected_batch.schema(), actual_batch.schema());
1804            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1805            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1806            for i in 0..expected_batch.num_columns() {
1807                let expected_data = expected_batch.column(i).to_data();
1808                let actual_data = actual_batch.column(i).to_data();
1809
1810                assert_eq!(expected_data, actual_data);
1811            }
1812        }
1813    }
1814
1815    #[test]
1816    fn arrow_writer_non_null() {
1817        // define schema
1818        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1819
1820        // create some data
1821        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1822
1823        // build a record batch
1824        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1825
1826        roundtrip(batch, Some(SMALL_SIZE / 2));
1827    }
1828
1829    #[test]
1830    fn arrow_writer_list() {
1831        // define schema
1832        let schema = Schema::new(vec![Field::new(
1833            "a",
1834            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1835            true,
1836        )]);
1837
1838        // create some data
1839        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1840
1841        // Construct a buffer for value offsets, for the nested array:
1842        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1843        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1844
1845        // Construct a list array from the above two
1846        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1847            DataType::Int32,
1848            false,
1849        ))))
1850        .len(5)
1851        .add_buffer(a_value_offsets)
1852        .add_child_data(a_values.into_data())
1853        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1854        .build()
1855        .unwrap();
1856        let a = ListArray::from(a_list_data);
1857
1858        // build a record batch
1859        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1860
1861        assert_eq!(batch.column(0).null_count(), 1);
1862
1863        // This test fails if the max row group size is less than the batch's length
1864        // see https://github.com/apache/arrow-rs/issues/518
1865        roundtrip(batch, None);
1866    }
1867
1868    #[test]
1869    fn arrow_writer_list_non_null() {
1870        // define schema
1871        let schema = Schema::new(vec![Field::new(
1872            "a",
1873            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1874            false,
1875        )]);
1876
1877        // create some data
1878        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1879
1880        // Construct a buffer for value offsets, for the nested array:
1881        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1882        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1883
1884        // Construct a list array from the above two
1885        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1886            DataType::Int32,
1887            false,
1888        ))))
1889        .len(5)
1890        .add_buffer(a_value_offsets)
1891        .add_child_data(a_values.into_data())
1892        .build()
1893        .unwrap();
1894        let a = ListArray::from(a_list_data);
1895
1896        // build a record batch
1897        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1898
1899        // This test fails if the max row group size is less than the batch's length
1900        // see https://github.com/apache/arrow-rs/issues/518
1901        assert_eq!(batch.column(0).null_count(), 0);
1902
1903        roundtrip(batch, None);
1904    }
1905
1906    #[test]
1907    fn arrow_writer_list_view() {
1908        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1909        let schema = Schema::new(vec![Field::new(
1910            "a",
1911            DataType::ListView(list_field.clone()),
1912            true,
1913        )]);
1914
1915        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1916        let a = ListViewArray::new(
1917            list_field,
1918            vec![0, 1, 0, 3, 6].into(),
1919            vec![1, 2, 0, 3, 4].into(),
1920            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1921            Some(vec![true, true, false, true, true].into()),
1922        );
1923
1924        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1925
1926        assert_eq!(batch.column(0).null_count(), 1);
1927
1928        roundtrip(batch, None);
1929    }
1930
1931    #[test]
1932    fn arrow_writer_list_view_non_null() {
1933        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1934        let schema = Schema::new(vec![Field::new(
1935            "a",
1936            DataType::ListView(list_field.clone()),
1937            false,
1938        )]);
1939
1940        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1941        let a = ListViewArray::new(
1942            list_field,
1943            vec![0, 1, 0, 3, 6].into(),
1944            vec![1, 2, 0, 3, 4].into(),
1945            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1946            None,
1947        );
1948
1949        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1950
1951        assert_eq!(batch.column(0).null_count(), 0);
1952
1953        roundtrip(batch, None);
1954    }
1955
1956    #[test]
1957    fn arrow_writer_list_view_out_of_order() {
1958        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1959        let schema = Schema::new(vec![Field::new(
1960            "a",
1961            DataType::ListView(list_field.clone()),
1962            false,
1963        )]);
1964
1965        // [[1], [2, 3], [], [7, 8, 9, 10], [4, 5, 6]] - out of order offsets
1966        let a = ListViewArray::new(
1967            list_field,
1968            vec![0, 1, 0, 6, 3].into(),
1969            vec![1, 2, 0, 4, 3].into(),
1970            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1971            None,
1972        );
1973
1974        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1975
1976        roundtrip(batch, None);
1977    }
1978
1979    #[test]
1980    fn arrow_writer_large_list_view() {
1981        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1982        let schema = Schema::new(vec![Field::new(
1983            "a",
1984            DataType::LargeListView(list_field.clone()),
1985            true,
1986        )]);
1987
1988        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1989        let a = LargeListViewArray::new(
1990            list_field,
1991            vec![0i64, 1, 0, 3, 6].into(),
1992            vec![1i64, 2, 0, 3, 4].into(),
1993            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1994            Some(vec![true, true, false, true, true].into()),
1995        );
1996
1997        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1998
1999        assert_eq!(batch.column(0).null_count(), 1);
2000
2001        roundtrip(batch, None);
2002    }
2003
2004    #[test]
2005    fn arrow_writer_list_view_with_struct() {
2006        // Test ListView containing Struct: ListView<Struct<Int32, Utf8>>
2007        let struct_fields = Fields::from(vec![
2008            Field::new("id", DataType::Int32, false),
2009            Field::new("name", DataType::Utf8, false),
2010        ]);
2011        let struct_type = DataType::Struct(struct_fields.clone());
2012        let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2013
2014        let schema = Schema::new(vec![Field::new(
2015            "a",
2016            DataType::ListView(list_field.clone()),
2017            true,
2018        )]);
2019
2020        // Create struct values
2021        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2022        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2023        let struct_array = StructArray::new(
2024            struct_fields,
2025            vec![Arc::new(id_array), Arc::new(name_array)],
2026            None,
2027        );
2028
2029        // Create ListView: [{1, "a"}, {2, "b"}], null, [{3, "c"}, {4, "d"}, {5, "e"}]
2030        let list_view = ListViewArray::new(
2031            list_field,
2032            vec![0, 2, 2].into(), // offsets
2033            vec![2, 0, 3].into(), // sizes
2034            Arc::new(struct_array),
2035            Some(vec![true, false, true].into()),
2036        );
2037
2038        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2039
2040        roundtrip(batch, None);
2041    }
2042
2043    #[test]
2044    fn arrow_writer_binary() {
2045        let string_field = Field::new("a", DataType::Utf8, false);
2046        let binary_field = Field::new("b", DataType::Binary, false);
2047        let schema = Schema::new(vec![string_field, binary_field]);
2048
2049        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2050        let raw_binary_values = [
2051            b"foo".to_vec(),
2052            b"bar".to_vec(),
2053            b"baz".to_vec(),
2054            b"quux".to_vec(),
2055        ];
2056        let raw_binary_value_refs = raw_binary_values
2057            .iter()
2058            .map(|x| x.as_slice())
2059            .collect::<Vec<_>>();
2060
2061        let string_values = StringArray::from(raw_string_values.clone());
2062        let binary_values = BinaryArray::from(raw_binary_value_refs);
2063        let batch = RecordBatch::try_new(
2064            Arc::new(schema),
2065            vec![Arc::new(string_values), Arc::new(binary_values)],
2066        )
2067        .unwrap();
2068
2069        roundtrip(batch, Some(SMALL_SIZE / 2));
2070    }
2071
2072    #[test]
2073    fn arrow_writer_binary_view() {
2074        let string_field = Field::new("a", DataType::Utf8View, false);
2075        let binary_field = Field::new("b", DataType::BinaryView, false);
2076        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2077        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2078
2079        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2080        let raw_binary_values = vec![
2081            b"foo".to_vec(),
2082            b"bar".to_vec(),
2083            b"large payload over 12 bytes".to_vec(),
2084            b"lulu".to_vec(),
2085        ];
2086        let nullable_string_values =
2087            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2088
2089        let string_view_values = StringViewArray::from(raw_string_values);
2090        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2091        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2092        let batch = RecordBatch::try_new(
2093            Arc::new(schema),
2094            vec![
2095                Arc::new(string_view_values),
2096                Arc::new(binary_view_values),
2097                Arc::new(nullable_string_view_values),
2098            ],
2099        )
2100        .unwrap();
2101
2102        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2103        roundtrip(batch, None);
2104    }
2105
2106    #[test]
2107    fn arrow_writer_binary_view_long_value() {
2108        let string_field = Field::new("a", DataType::Utf8View, false);
2109        let binary_field = Field::new("b", DataType::BinaryView, false);
2110        let schema = Schema::new(vec![string_field, binary_field]);
2111
2112        // There is special case validation for long values (greater than 128)
2113        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
2114        // trigger the long-string UTF-8 validation branch in the plain decoder.
2115        let long = "a".repeat(128);
2116        let raw_string_values = vec!["foo", long.as_str(), "bar"];
2117        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2118
2119        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2120        let binary_view_values: ArrayRef =
2121            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2122
2123        one_column_roundtrip(Arc::clone(&string_view_values), false);
2124        one_column_roundtrip(Arc::clone(&binary_view_values), false);
2125
2126        let batch = RecordBatch::try_new(
2127            Arc::new(schema),
2128            vec![string_view_values, binary_view_values],
2129        )
2130        .unwrap();
2131
2132        // Disable dictionary to exercise plain encoding paths in the reader.
2133        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2134            let props = WriterProperties::builder()
2135                .set_writer_version(version)
2136                .set_dictionary_enabled(false)
2137                .build();
2138            roundtrip_opts(&batch, props);
2139        }
2140    }
2141
2142    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2143        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2144        let schema = Schema::new(vec![decimal_field]);
2145
2146        let decimal_values = vec![10_000, 50_000, 0, -100]
2147            .into_iter()
2148            .map(Some)
2149            .collect::<Decimal128Array>()
2150            .with_precision_and_scale(precision, scale)
2151            .unwrap();
2152
2153        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2154    }
2155
2156    #[test]
2157    fn arrow_writer_decimal() {
2158        // int32 to store the decimal value
2159        let batch_int32_decimal = get_decimal_batch(5, 2);
2160        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2161        // int64 to store the decimal value
2162        let batch_int64_decimal = get_decimal_batch(12, 2);
2163        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2164        // fixed_length_byte_array to store the decimal value
2165        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2166        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2167    }
2168
2169    #[test]
2170    fn arrow_writer_complex() {
2171        // define schema
2172        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2173        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2174        let struct_field_g = Arc::new(Field::new_list(
2175            "g",
2176            Field::new_list_field(DataType::Int16, true),
2177            false,
2178        ));
2179        let struct_field_h = Arc::new(Field::new_list(
2180            "h",
2181            Field::new_list_field(DataType::Int16, false),
2182            true,
2183        ));
2184        let struct_field_e = Arc::new(Field::new_struct(
2185            "e",
2186            vec![
2187                struct_field_f.clone(),
2188                struct_field_g.clone(),
2189                struct_field_h.clone(),
2190            ],
2191            false,
2192        ));
2193        let schema = Schema::new(vec![
2194            Field::new("a", DataType::Int32, false),
2195            Field::new("b", DataType::Int32, true),
2196            Field::new_struct(
2197                "c",
2198                vec![struct_field_d.clone(), struct_field_e.clone()],
2199                false,
2200            ),
2201        ]);
2202
2203        // create some data
2204        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2205        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2206        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2207        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2208
2209        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2210
2211        // Construct a buffer for value offsets, for the nested array:
2212        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2213        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2214
2215        // Construct a list array from the above two
2216        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2217            .len(5)
2218            .add_buffer(g_value_offsets.clone())
2219            .add_child_data(g_value.to_data())
2220            .build()
2221            .unwrap();
2222        let g = ListArray::from(g_list_data);
2223        // The difference between g and h is that h has a null bitmap
2224        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2225            .len(5)
2226            .add_buffer(g_value_offsets)
2227            .add_child_data(g_value.to_data())
2228            .null_bit_buffer(Some(Buffer::from([0b00011011])))
2229            .build()
2230            .unwrap();
2231        let h = ListArray::from(h_list_data);
2232
2233        let e = StructArray::from(vec![
2234            (struct_field_f, Arc::new(f) as ArrayRef),
2235            (struct_field_g, Arc::new(g) as ArrayRef),
2236            (struct_field_h, Arc::new(h) as ArrayRef),
2237        ]);
2238
2239        let c = StructArray::from(vec![
2240            (struct_field_d, Arc::new(d) as ArrayRef),
2241            (struct_field_e, Arc::new(e) as ArrayRef),
2242        ]);
2243
2244        // build a record batch
2245        let batch = RecordBatch::try_new(
2246            Arc::new(schema),
2247            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2248        )
2249        .unwrap();
2250
2251        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2252        roundtrip(batch, Some(SMALL_SIZE / 3));
2253    }
2254
2255    #[test]
2256    fn arrow_writer_complex_mixed() {
2257        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
2258        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
2259
2260        // define schema
2261        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2262        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2263        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2264        let schema = Schema::new(vec![Field::new(
2265            "some_nested_object",
2266            DataType::Struct(Fields::from(vec![
2267                offset_field.clone(),
2268                partition_field.clone(),
2269                topic_field.clone(),
2270            ])),
2271            false,
2272        )]);
2273
2274        // create some data
2275        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2276        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2277        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2278
2279        let some_nested_object = StructArray::from(vec![
2280            (offset_field, Arc::new(offset) as ArrayRef),
2281            (partition_field, Arc::new(partition) as ArrayRef),
2282            (topic_field, Arc::new(topic) as ArrayRef),
2283        ]);
2284
2285        // build a record batch
2286        let batch =
2287            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2288
2289        roundtrip(batch, Some(SMALL_SIZE / 2));
2290    }
2291
2292    #[test]
2293    fn arrow_writer_map() {
2294        // Note: we are using the JSON Arrow reader for brevity
2295        let json_content = r#"
2296        {"stocks":{"long": "$AAA", "short": "$BBB"}}
2297        {"stocks":{"long": null, "long": "$CCC", "short": null}}
2298        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2299        "#;
2300        let entries_struct_type = DataType::Struct(Fields::from(vec![
2301            Field::new("key", DataType::Utf8, false),
2302            Field::new("value", DataType::Utf8, true),
2303        ]));
2304        let stocks_field = Field::new(
2305            "stocks",
2306            DataType::Map(
2307                Arc::new(Field::new("entries", entries_struct_type, false)),
2308                false,
2309            ),
2310            true,
2311        );
2312        let schema = Arc::new(Schema::new(vec![stocks_field]));
2313        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2314        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2315
2316        let batch = reader.next().unwrap().unwrap();
2317        roundtrip(batch, None);
2318    }
2319
2320    #[test]
2321    fn arrow_writer_2_level_struct() {
2322        // tests writing <struct<struct<primitive>>
2323        let field_c = Field::new("c", DataType::Int32, true);
2324        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2325        let type_a = DataType::Struct(vec![field_b.clone()].into());
2326        let field_a = Field::new("a", type_a, true);
2327        let schema = Schema::new(vec![field_a.clone()]);
2328
2329        // create data
2330        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2331        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2332            .len(6)
2333            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2334            .add_child_data(c.into_data())
2335            .build()
2336            .unwrap();
2337        let b = StructArray::from(b_data);
2338        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2339            .len(6)
2340            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2341            .add_child_data(b.into_data())
2342            .build()
2343            .unwrap();
2344        let a = StructArray::from(a_data);
2345
2346        assert_eq!(a.null_count(), 1);
2347        assert_eq!(a.column(0).null_count(), 2);
2348
2349        // build a racord batch
2350        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2351
2352        roundtrip(batch, Some(SMALL_SIZE / 2));
2353    }
2354
2355    #[test]
2356    fn arrow_writer_2_level_struct_non_null() {
2357        // tests writing <struct<struct<primitive>>
2358        let field_c = Field::new("c", DataType::Int32, false);
2359        let type_b = DataType::Struct(vec![field_c].into());
2360        let field_b = Field::new("b", type_b.clone(), false);
2361        let type_a = DataType::Struct(vec![field_b].into());
2362        let field_a = Field::new("a", type_a.clone(), false);
2363        let schema = Schema::new(vec![field_a]);
2364
2365        // create data
2366        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2367        let b_data = ArrayDataBuilder::new(type_b)
2368            .len(6)
2369            .add_child_data(c.into_data())
2370            .build()
2371            .unwrap();
2372        let b = StructArray::from(b_data);
2373        let a_data = ArrayDataBuilder::new(type_a)
2374            .len(6)
2375            .add_child_data(b.into_data())
2376            .build()
2377            .unwrap();
2378        let a = StructArray::from(a_data);
2379
2380        assert_eq!(a.null_count(), 0);
2381        assert_eq!(a.column(0).null_count(), 0);
2382
2383        // build a racord batch
2384        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2385
2386        roundtrip(batch, Some(SMALL_SIZE / 2));
2387    }
2388
2389    #[test]
2390    fn arrow_writer_2_level_struct_mixed_null() {
2391        // tests writing <struct<struct<primitive>>
2392        let field_c = Field::new("c", DataType::Int32, false);
2393        let type_b = DataType::Struct(vec![field_c].into());
2394        let field_b = Field::new("b", type_b.clone(), true);
2395        let type_a = DataType::Struct(vec![field_b].into());
2396        let field_a = Field::new("a", type_a.clone(), false);
2397        let schema = Schema::new(vec![field_a]);
2398
2399        // create data
2400        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2401        let b_data = ArrayDataBuilder::new(type_b)
2402            .len(6)
2403            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2404            .add_child_data(c.into_data())
2405            .build()
2406            .unwrap();
2407        let b = StructArray::from(b_data);
2408        // a intentionally has no null buffer, to test that this is handled correctly
2409        let a_data = ArrayDataBuilder::new(type_a)
2410            .len(6)
2411            .add_child_data(b.into_data())
2412            .build()
2413            .unwrap();
2414        let a = StructArray::from(a_data);
2415
2416        assert_eq!(a.null_count(), 0);
2417        assert_eq!(a.column(0).null_count(), 2);
2418
2419        // build a racord batch
2420        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2421
2422        roundtrip(batch, Some(SMALL_SIZE / 2));
2423    }
2424
2425    #[test]
2426    fn arrow_writer_2_level_struct_mixed_null_2() {
2427        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2428        let field_c = Field::new("c", DataType::Int32, false);
2429        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2430        let field_e = Field::new(
2431            "e",
2432            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2433            false,
2434        );
2435
2436        let field_b = Field::new(
2437            "b",
2438            DataType::Struct(vec![field_c, field_d, field_e].into()),
2439            false,
2440        );
2441        let type_a = DataType::Struct(vec![field_b.clone()].into());
2442        let field_a = Field::new("a", type_a, true);
2443        let schema = Schema::new(vec![field_a.clone()]);
2444
2445        // create data
2446        let c = Int32Array::from_iter_values(0..6);
2447        let d = FixedSizeBinaryArray::try_from_iter(
2448            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2449        )
2450        .expect("four byte values");
2451        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2452        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2453            .len(6)
2454            .add_child_data(c.into_data())
2455            .add_child_data(d.into_data())
2456            .add_child_data(e.into_data())
2457            .build()
2458            .unwrap();
2459        let b = StructArray::from(b_data);
2460        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2461            .len(6)
2462            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2463            .add_child_data(b.into_data())
2464            .build()
2465            .unwrap();
2466        let a = StructArray::from(a_data);
2467
2468        assert_eq!(a.null_count(), 3);
2469        assert_eq!(a.column(0).null_count(), 0);
2470
2471        // build a record batch
2472        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2473
2474        roundtrip(batch, Some(SMALL_SIZE / 2));
2475    }
2476
2477    #[test]
2478    fn test_fixed_size_binary_in_dict() {
2479        fn test_fixed_size_binary_in_dict_inner<K>()
2480        where
2481            K: ArrowDictionaryKeyType,
2482            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2483            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2484        {
2485            let field = Field::new(
2486                "a",
2487                DataType::Dictionary(
2488                    Box::new(K::DATA_TYPE),
2489                    Box::new(DataType::FixedSizeBinary(4)),
2490                ),
2491                false,
2492            );
2493            let schema = Schema::new(vec![field]);
2494
2495            let keys: Vec<K::Native> = vec![
2496                K::Native::try_from(0u8).unwrap(),
2497                K::Native::try_from(0u8).unwrap(),
2498                K::Native::try_from(1u8).unwrap(),
2499            ];
2500            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2501            let values = FixedSizeBinaryArray::try_from_iter(
2502                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2503            )
2504            .unwrap();
2505
2506            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2507            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2508            roundtrip(batch, None);
2509        }
2510
2511        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2512        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2513        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2514        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2515        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2516        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2517        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2518        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2519    }
2520
2521    #[test]
2522    fn test_empty_dict() {
2523        let struct_fields = Fields::from(vec![Field::new(
2524            "dict",
2525            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2526            false,
2527        )]);
2528
2529        let schema = Schema::new(vec![Field::new_struct(
2530            "struct",
2531            struct_fields.clone(),
2532            true,
2533        )]);
2534        let dictionary = Arc::new(DictionaryArray::new(
2535            Int32Array::new_null(5),
2536            Arc::new(StringArray::new_null(0)),
2537        ));
2538
2539        let s = StructArray::new(
2540            struct_fields,
2541            vec![dictionary],
2542            Some(NullBuffer::new_null(5)),
2543        );
2544
2545        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2546        roundtrip(batch, None);
2547    }
2548    #[test]
2549    fn arrow_writer_page_size() {
2550        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2551
2552        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2553
2554        // Generate an array of 10 unique 10 character string
2555        for i in 0..10 {
2556            let value = i
2557                .to_string()
2558                .repeat(10)
2559                .chars()
2560                .take(10)
2561                .collect::<String>();
2562
2563            builder.append_value(value);
2564        }
2565
2566        let array = Arc::new(builder.finish());
2567
2568        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2569
2570        let file = tempfile::tempfile().unwrap();
2571
2572        // Set everything very low so we fallback to PLAIN encoding after the first row
2573        let props = WriterProperties::builder()
2574            .set_data_page_size_limit(1)
2575            .set_dictionary_page_size_limit(1)
2576            .set_write_batch_size(1)
2577            .build();
2578
2579        let mut writer =
2580            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2581                .expect("Unable to write file");
2582        writer.write(&batch).unwrap();
2583        writer.close().unwrap();
2584
2585        let options = ReadOptionsBuilder::new().with_page_index().build();
2586        let reader =
2587            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2588
2589        let column = reader.metadata().row_group(0).columns();
2590
2591        assert_eq!(column.len(), 1);
2592
2593        // We should write one row before falling back to PLAIN encoding so there should still be a
2594        // dictionary page.
2595        assert!(
2596            column[0].dictionary_page_offset().is_some(),
2597            "Expected a dictionary page"
2598        );
2599
2600        assert!(reader.metadata().offset_index().is_some());
2601        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2602
2603        let page_locations = offset_indexes[0].page_locations.clone();
2604
2605        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2606        // so we expect one dictionary encoded page and then a page per row thereafter.
2607        assert_eq!(
2608            page_locations.len(),
2609            10,
2610            "Expected 10 pages but got {page_locations:#?}"
2611        );
2612    }
2613
2614    #[test]
2615    fn arrow_writer_float_nans() {
2616        let f16_field = Field::new("a", DataType::Float16, false);
2617        let f32_field = Field::new("b", DataType::Float32, false);
2618        let f64_field = Field::new("c", DataType::Float64, false);
2619        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2620
2621        let f16_values = (0..MEDIUM_SIZE)
2622            .map(|i| {
2623                Some(if i % 2 == 0 {
2624                    f16::NAN
2625                } else {
2626                    f16::from_f32(i as f32)
2627                })
2628            })
2629            .collect::<Float16Array>();
2630
2631        let f32_values = (0..MEDIUM_SIZE)
2632            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2633            .collect::<Float32Array>();
2634
2635        let f64_values = (0..MEDIUM_SIZE)
2636            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2637            .collect::<Float64Array>();
2638
2639        let batch = RecordBatch::try_new(
2640            Arc::new(schema),
2641            vec![
2642                Arc::new(f16_values),
2643                Arc::new(f32_values),
2644                Arc::new(f64_values),
2645            ],
2646        )
2647        .unwrap();
2648
2649        roundtrip(batch, None);
2650    }
2651
2652    const SMALL_SIZE: usize = 7;
2653    const MEDIUM_SIZE: usize = 63;
2654
2655    // Write the batch to parquet and read it back out, ensuring
2656    // that what comes out is the same as what was written in
2657    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2658        let mut files = vec![];
2659        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2660            let mut props = WriterProperties::builder().set_writer_version(version);
2661
2662            if let Some(size) = max_row_group_size {
2663                props = props.set_max_row_group_row_count(Some(size))
2664            }
2665
2666            let props = props.build();
2667            files.push(roundtrip_opts(&expected_batch, props))
2668        }
2669        files
2670    }
2671
2672    // Round trip the specified record batch with the specified writer properties,
2673    // to an in-memory file, and validate the arrays using the specified function.
2674    // Returns the in-memory file.
2675    fn roundtrip_opts_with_array_validation<F>(
2676        expected_batch: &RecordBatch,
2677        props: WriterProperties,
2678        validate: F,
2679    ) -> Bytes
2680    where
2681        F: Fn(&ArrayData, &ArrayData),
2682    {
2683        let mut file = vec![];
2684
2685        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2686            .expect("Unable to write file");
2687        writer.write(expected_batch).unwrap();
2688        writer.close().unwrap();
2689
2690        let file = Bytes::from(file);
2691        let mut record_batch_reader =
2692            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2693
2694        let actual_batch = record_batch_reader
2695            .next()
2696            .expect("No batch found")
2697            .expect("Unable to get batch");
2698
2699        assert_eq!(expected_batch.schema(), actual_batch.schema());
2700        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2701        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2702        for i in 0..expected_batch.num_columns() {
2703            let expected_data = expected_batch.column(i).to_data();
2704            let actual_data = actual_batch.column(i).to_data();
2705            validate(&expected_data, &actual_data);
2706        }
2707
2708        file
2709    }
2710
2711    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2712        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2713            a.validate_full().expect("valid expected data");
2714            b.validate_full().expect("valid actual data");
2715            assert_eq!(a, b)
2716        })
2717    }
2718
2719    struct RoundTripOptions {
2720        values: ArrayRef,
2721        schema: SchemaRef,
2722        bloom_filter: bool,
2723        bloom_filter_ndv: Option<u64>,
2724        bloom_filter_position: BloomFilterPosition,
2725    }
2726
2727    impl RoundTripOptions {
2728        fn new(values: ArrayRef, nullable: bool) -> Self {
2729            let data_type = values.data_type().clone();
2730            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2731            Self {
2732                values,
2733                schema: Arc::new(schema),
2734                bloom_filter: false,
2735                bloom_filter_ndv: None,
2736                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2737            }
2738        }
2739    }
2740
2741    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2742        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2743    }
2744
2745    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2746        let mut options = RoundTripOptions::new(values, false);
2747        options.schema = schema;
2748        one_column_roundtrip_with_options(options)
2749    }
2750
2751    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2752        let RoundTripOptions {
2753            values,
2754            schema,
2755            bloom_filter,
2756            bloom_filter_ndv,
2757            bloom_filter_position,
2758        } = options;
2759
2760        let encodings = match values.data_type() {
2761            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2762                vec![
2763                    Encoding::PLAIN,
2764                    Encoding::DELTA_BYTE_ARRAY,
2765                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2766                ]
2767            }
2768            DataType::Int64
2769            | DataType::Int32
2770            | DataType::Int16
2771            | DataType::Int8
2772            | DataType::UInt64
2773            | DataType::UInt32
2774            | DataType::UInt16
2775            | DataType::UInt8 => vec![
2776                Encoding::PLAIN,
2777                Encoding::DELTA_BINARY_PACKED,
2778                Encoding::BYTE_STREAM_SPLIT,
2779            ],
2780            DataType::Float32 | DataType::Float64 => {
2781                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2782            }
2783            _ => vec![Encoding::PLAIN],
2784        };
2785
2786        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2787
2788        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2789
2790        let mut files = vec![];
2791        for dictionary_size in [0, 1, 1024] {
2792            for encoding in &encodings {
2793                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2794                    for row_group_size in row_group_sizes {
2795                        let mut builder = WriterProperties::builder()
2796                            .set_writer_version(version)
2797                            .set_max_row_group_row_count(Some(row_group_size))
2798                            .set_dictionary_enabled(dictionary_size != 0)
2799                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2800                            .set_encoding(*encoding)
2801                            .set_bloom_filter_enabled(bloom_filter)
2802                            .set_bloom_filter_position(bloom_filter_position);
2803                        if let Some(ndv) = bloom_filter_ndv {
2804                            builder = builder.set_bloom_filter_max_ndv(ndv);
2805                        }
2806                        let props = builder.build();
2807
2808                        files.push(roundtrip_opts(&expected_batch, props))
2809                    }
2810                }
2811            }
2812        }
2813        files
2814    }
2815
2816    fn values_required<A, I>(iter: I) -> Vec<Bytes>
2817    where
2818        A: From<Vec<I::Item>> + Array + 'static,
2819        I: IntoIterator,
2820    {
2821        let raw_values: Vec<_> = iter.into_iter().collect();
2822        let values = Arc::new(A::from(raw_values));
2823        one_column_roundtrip(values, false)
2824    }
2825
2826    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2827    where
2828        A: From<Vec<Option<I::Item>>> + Array + 'static,
2829        I: IntoIterator,
2830    {
2831        let optional_raw_values: Vec<_> = iter
2832            .into_iter()
2833            .enumerate()
2834            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2835            .collect();
2836        let optional_values = Arc::new(A::from(optional_raw_values));
2837        one_column_roundtrip(optional_values, true)
2838    }
2839
2840    fn required_and_optional<A, I>(iter: I)
2841    where
2842        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2843        I: IntoIterator + Clone,
2844    {
2845        values_required::<A, I>(iter.clone());
2846        values_optional::<A, I>(iter);
2847    }
2848
2849    fn check_bloom_filter<T: AsBytes>(
2850        files: Vec<Bytes>,
2851        file_column: String,
2852        positive_values: Vec<T>,
2853        negative_values: Vec<T>,
2854    ) {
2855        files.into_iter().take(1).for_each(|file| {
2856            let file_reader = SerializedFileReader::new_with_options(
2857                file,
2858                ReadOptionsBuilder::new()
2859                    .with_reader_properties(
2860                        ReaderProperties::builder()
2861                            .set_read_bloom_filter(true)
2862                            .build(),
2863                    )
2864                    .build(),
2865            )
2866            .expect("Unable to open file as Parquet");
2867            let metadata = file_reader.metadata();
2868
2869            // Gets bloom filters from all row groups.
2870            let mut bloom_filters: Vec<_> = vec![];
2871            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2872                if let Some((column_index, _)) = row_group
2873                    .columns()
2874                    .iter()
2875                    .enumerate()
2876                    .find(|(_, column)| column.column_path().string() == file_column)
2877                {
2878                    let row_group_reader = file_reader
2879                        .get_row_group(ri)
2880                        .expect("Unable to read row group");
2881                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2882                        bloom_filters.push(sbbf.clone());
2883                    } else {
2884                        panic!("No bloom filter for column named {file_column} found");
2885                    }
2886                } else {
2887                    panic!("No column named {file_column} found");
2888                }
2889            }
2890
2891            positive_values.iter().for_each(|value| {
2892                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2893                assert!(
2894                    found.is_some(),
2895                    "{}",
2896                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2897                );
2898            });
2899
2900            negative_values.iter().for_each(|value| {
2901                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2902                assert!(
2903                    found.is_none(),
2904                    "{}",
2905                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2906                );
2907            });
2908        });
2909    }
2910
2911    #[test]
2912    fn all_null_primitive_single_column() {
2913        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2914        one_column_roundtrip(values, true);
2915    }
2916    #[test]
2917    fn null_single_column() {
2918        let values = Arc::new(NullArray::new(SMALL_SIZE));
2919        one_column_roundtrip(values, true);
2920        // null arrays are always nullable, a test with non-nullable nulls fails
2921    }
2922
2923    #[test]
2924    fn bool_single_column() {
2925        required_and_optional::<BooleanArray, _>(
2926            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2927        );
2928    }
2929
2930    #[test]
2931    fn bool_large_single_column() {
2932        let values = Arc::new(
2933            [None, Some(true), Some(false)]
2934                .iter()
2935                .cycle()
2936                .copied()
2937                .take(200_000)
2938                .collect::<BooleanArray>(),
2939        );
2940        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2941        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2942        let file = tempfile::tempfile().unwrap();
2943
2944        let mut writer =
2945            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2946                .expect("Unable to write file");
2947        writer.write(&expected_batch).unwrap();
2948        writer.close().unwrap();
2949    }
2950
2951    #[test]
2952    fn check_page_offset_index_with_nan() {
2953        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2954        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2955        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2956
2957        let mut out = Vec::with_capacity(1024);
2958        let mut writer =
2959            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2960        writer.write(&batch).unwrap();
2961        let file_meta_data = writer.close().unwrap();
2962        for row_group in file_meta_data.row_groups() {
2963            for column in row_group.columns() {
2964                assert!(column.offset_index_offset().is_some());
2965                assert!(column.offset_index_length().is_some());
2966                assert!(column.column_index_offset().is_none());
2967                assert!(column.column_index_length().is_none());
2968            }
2969        }
2970    }
2971
2972    #[test]
2973    fn i8_single_column() {
2974        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2975    }
2976
2977    #[test]
2978    fn i16_single_column() {
2979        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2980    }
2981
2982    #[test]
2983    fn i32_single_column() {
2984        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2985    }
2986
2987    #[test]
2988    fn i64_single_column() {
2989        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2990    }
2991
2992    #[test]
2993    fn u8_single_column() {
2994        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2995    }
2996
2997    #[test]
2998    fn u16_single_column() {
2999        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3000    }
3001
3002    #[test]
3003    fn u32_single_column() {
3004        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3005    }
3006
3007    #[test]
3008    fn u64_single_column() {
3009        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3010    }
3011
3012    #[test]
3013    fn f32_single_column() {
3014        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3015    }
3016
3017    #[test]
3018    fn f64_single_column() {
3019        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3020    }
3021
3022    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
3023    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
3024    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
3025
3026    #[test]
3027    fn timestamp_second_single_column() {
3028        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3029        let values = Arc::new(TimestampSecondArray::from(raw_values));
3030
3031        one_column_roundtrip(values, false);
3032    }
3033
3034    #[test]
3035    fn timestamp_millisecond_single_column() {
3036        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3037        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3038
3039        one_column_roundtrip(values, false);
3040    }
3041
3042    #[test]
3043    fn timestamp_microsecond_single_column() {
3044        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3045        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3046
3047        one_column_roundtrip(values, false);
3048    }
3049
3050    #[test]
3051    fn timestamp_nanosecond_single_column() {
3052        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3053        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3054
3055        one_column_roundtrip(values, false);
3056    }
3057
3058    #[test]
3059    fn date32_single_column() {
3060        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3061    }
3062
3063    #[test]
3064    fn date64_single_column() {
3065        // Date64 must be a multiple of 86400000, see ARROW-10925
3066        required_and_optional::<Date64Array, _>(
3067            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3068        );
3069    }
3070
3071    #[test]
3072    fn time32_second_single_column() {
3073        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3074    }
3075
3076    #[test]
3077    fn time32_millisecond_single_column() {
3078        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3079    }
3080
3081    #[test]
3082    fn time64_microsecond_single_column() {
3083        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3084    }
3085
3086    #[test]
3087    fn time64_nanosecond_single_column() {
3088        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3089    }
3090
3091    #[test]
3092    fn duration_second_single_column() {
3093        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3094    }
3095
3096    #[test]
3097    fn duration_millisecond_single_column() {
3098        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3099    }
3100
3101    #[test]
3102    fn duration_microsecond_single_column() {
3103        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3104    }
3105
3106    #[test]
3107    fn duration_nanosecond_single_column() {
3108        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3109    }
3110
3111    #[test]
3112    fn interval_year_month_single_column() {
3113        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3114    }
3115
3116    #[test]
3117    fn interval_day_time_single_column() {
3118        required_and_optional::<IntervalDayTimeArray, _>(vec![
3119            IntervalDayTime::new(0, 1),
3120            IntervalDayTime::new(0, 3),
3121            IntervalDayTime::new(3, -2),
3122            IntervalDayTime::new(-200, 4),
3123        ]);
3124    }
3125
3126    #[test]
3127    #[should_panic(
3128        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3129    )]
3130    fn interval_month_day_nano_single_column() {
3131        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3132            IntervalMonthDayNano::new(0, 1, 5),
3133            IntervalMonthDayNano::new(0, 3, 2),
3134            IntervalMonthDayNano::new(3, -2, -5),
3135            IntervalMonthDayNano::new(-200, 4, -1),
3136        ]);
3137    }
3138
3139    #[test]
3140    fn binary_single_column() {
3141        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3142        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3143        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3144
3145        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3146        values_required::<BinaryArray, _>(many_vecs_iter);
3147    }
3148
3149    #[test]
3150    fn binary_view_single_column() {
3151        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3152        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3153        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3154
3155        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3156        values_required::<BinaryViewArray, _>(many_vecs_iter);
3157    }
3158
3159    #[test]
3160    fn i32_column_bloom_filter_at_end() {
3161        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3162        let mut options = RoundTripOptions::new(array, false);
3163        options.bloom_filter = true;
3164        options.bloom_filter_position = BloomFilterPosition::End;
3165
3166        let files = one_column_roundtrip_with_options(options);
3167        check_bloom_filter(
3168            files,
3169            "col".to_string(),
3170            (0..SMALL_SIZE as i32).collect(),
3171            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3172        );
3173    }
3174
3175    #[test]
3176    fn i32_column_bloom_filter() {
3177        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3178        let mut options = RoundTripOptions::new(array, false);
3179        options.bloom_filter = true;
3180
3181        let files = one_column_roundtrip_with_options(options);
3182        check_bloom_filter(
3183            files,
3184            "col".to_string(),
3185            (0..SMALL_SIZE as i32).collect(),
3186            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3187        );
3188    }
3189
3190    /// Test that bloom filter folding produces correct results even when
3191    /// the configured NDV differs significantly from actual NDV.
3192    /// A large NDV means a larger initial filter that gets folded down;
3193    /// a small NDV means a smaller initial filter.
3194    #[test]
3195    fn i32_column_bloom_filter_fixed_ndv() {
3196        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3197
3198        // NDV much larger than actual distinct values — tests folding a large filter down
3199        let mut options = RoundTripOptions::new(array.clone(), false);
3200        options.bloom_filter = true;
3201        options.bloom_filter_ndv = Some(1_000_000);
3202
3203        let files = one_column_roundtrip_with_options(options);
3204        check_bloom_filter(
3205            files,
3206            "col".to_string(),
3207            (0..SMALL_SIZE as i32).collect(),
3208            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3209        );
3210
3211        // NDV smaller than actual distinct values — tests the underestimate path
3212        let mut options = RoundTripOptions::new(array, false);
3213        options.bloom_filter = true;
3214        options.bloom_filter_ndv = Some(3);
3215
3216        let files = one_column_roundtrip_with_options(options);
3217        check_bloom_filter(
3218            files,
3219            "col".to_string(),
3220            (0..SMALL_SIZE as i32).collect(),
3221            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3222        );
3223    }
3224
3225    #[test]
3226    fn binary_column_bloom_filter() {
3227        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3228        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3229        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3230
3231        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3232        let mut options = RoundTripOptions::new(array, false);
3233        options.bloom_filter = true;
3234
3235        let files = one_column_roundtrip_with_options(options);
3236        check_bloom_filter(
3237            files,
3238            "col".to_string(),
3239            many_vecs,
3240            vec![vec![(SMALL_SIZE + 1) as u8]],
3241        );
3242    }
3243
3244    #[test]
3245    fn empty_string_null_column_bloom_filter() {
3246        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3247        let raw_strs = raw_values.iter().map(|s| s.as_str());
3248
3249        let array = Arc::new(StringArray::from_iter_values(raw_strs));
3250        let mut options = RoundTripOptions::new(array, false);
3251        options.bloom_filter = true;
3252
3253        let files = one_column_roundtrip_with_options(options);
3254
3255        let optional_raw_values: Vec<_> = raw_values
3256            .iter()
3257            .enumerate()
3258            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3259            .collect();
3260        // For null slots, empty string should not be in bloom filter.
3261        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3262    }
3263
3264    #[test]
3265    fn large_binary_single_column() {
3266        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3267        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3268        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3269
3270        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3271        values_required::<LargeBinaryArray, _>(many_vecs_iter);
3272    }
3273
3274    #[test]
3275    fn fixed_size_binary_single_column() {
3276        let mut builder = FixedSizeBinaryBuilder::new(4);
3277        builder.append_value(b"0123").unwrap();
3278        builder.append_null();
3279        builder.append_value(b"8910").unwrap();
3280        builder.append_value(b"1112").unwrap();
3281        let array = Arc::new(builder.finish());
3282
3283        one_column_roundtrip(array, true);
3284    }
3285
3286    #[test]
3287    fn string_single_column() {
3288        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3289        let raw_strs = raw_values.iter().map(|s| s.as_str());
3290
3291        required_and_optional::<StringArray, _>(raw_strs);
3292    }
3293
3294    #[test]
3295    fn large_string_single_column() {
3296        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3297        let raw_strs = raw_values.iter().map(|s| s.as_str());
3298
3299        required_and_optional::<LargeStringArray, _>(raw_strs);
3300    }
3301
3302    #[test]
3303    fn string_view_single_column() {
3304        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3305        let raw_strs = raw_values.iter().map(|s| s.as_str());
3306
3307        required_and_optional::<StringViewArray, _>(raw_strs);
3308    }
3309
3310    #[test]
3311    fn null_list_single_column() {
3312        let null_field = Field::new_list_field(DataType::Null, true);
3313        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3314
3315        let schema = Schema::new(vec![list_field]);
3316
3317        // Build [[], null, [null, null]]
3318        let a_values = NullArray::new(2);
3319        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3320        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3321            DataType::Null,
3322            true,
3323        ))))
3324        .len(3)
3325        .add_buffer(a_value_offsets)
3326        .null_bit_buffer(Some(Buffer::from([0b00000101])))
3327        .add_child_data(a_values.into_data())
3328        .build()
3329        .unwrap();
3330
3331        let a = ListArray::from(a_list_data);
3332
3333        assert!(a.is_valid(0));
3334        assert!(!a.is_valid(1));
3335        assert!(a.is_valid(2));
3336
3337        assert_eq!(a.value(0).len(), 0);
3338        assert_eq!(a.value(2).len(), 2);
3339        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3340
3341        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3342        roundtrip(batch, None);
3343    }
3344
3345    #[test]
3346    fn list_single_column() {
3347        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3348        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3349        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3350            DataType::Int32,
3351            false,
3352        ))))
3353        .len(5)
3354        .add_buffer(a_value_offsets)
3355        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3356        .add_child_data(a_values.into_data())
3357        .build()
3358        .unwrap();
3359
3360        assert_eq!(a_list_data.null_count(), 1);
3361
3362        let a = ListArray::from(a_list_data);
3363        let values = Arc::new(a);
3364
3365        one_column_roundtrip(values, true);
3366    }
3367
3368    #[test]
3369    fn large_list_single_column() {
3370        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3371        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3372        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3373            "large_item",
3374            DataType::Int32,
3375            true,
3376        ))))
3377        .len(5)
3378        .add_buffer(a_value_offsets)
3379        .add_child_data(a_values.into_data())
3380        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3381        .build()
3382        .unwrap();
3383
3384        // I think this setup is incorrect because this should pass
3385        assert_eq!(a_list_data.null_count(), 1);
3386
3387        let a = LargeListArray::from(a_list_data);
3388        let values = Arc::new(a);
3389
3390        one_column_roundtrip(values, true);
3391    }
3392
3393    #[test]
3394    fn list_nested_nulls() {
3395        use arrow::datatypes::Int32Type;
3396        let data = vec![
3397            Some(vec![Some(1)]),
3398            Some(vec![Some(2), Some(3)]),
3399            None,
3400            Some(vec![Some(4), Some(5), None]),
3401            Some(vec![None]),
3402            Some(vec![Some(6), Some(7)]),
3403        ];
3404
3405        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3406        one_column_roundtrip(Arc::new(list), true);
3407
3408        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3409        one_column_roundtrip(Arc::new(list), true);
3410    }
3411
3412    #[test]
3413    fn struct_single_column() {
3414        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3415        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3416        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3417
3418        let values = Arc::new(s);
3419        one_column_roundtrip(values, false);
3420    }
3421
3422    #[test]
3423    fn list_and_map_coerced_names() {
3424        // Create map and list with non-Parquet naming
3425        let list_field =
3426            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3427        let map_field = Field::new_map(
3428            "my_map",
3429            "entries",
3430            Field::new("keys", DataType::Int32, false),
3431            Field::new("values", DataType::Int32, true),
3432            false,
3433            true,
3434        );
3435
3436        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3437        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3438
3439        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3440
3441        // Write data to Parquet but coerce names to match spec
3442        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3443        let file = tempfile::tempfile().unwrap();
3444        let mut writer =
3445            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3446
3447        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3448        writer.write(&batch).unwrap();
3449        let file_metadata = writer.close().unwrap();
3450
3451        let schema = file_metadata.file_metadata().schema();
3452        // Coerced name of "item" should be "element"
3453        let list_field = &schema.get_fields()[0].get_fields()[0];
3454        assert_eq!(list_field.get_fields()[0].name(), "element");
3455
3456        let map_field = &schema.get_fields()[1].get_fields()[0];
3457        // Coerced name of "entries" should be "key_value"
3458        assert_eq!(map_field.name(), "key_value");
3459        // Coerced name of "keys" should be "key"
3460        assert_eq!(map_field.get_fields()[0].name(), "key");
3461        // Coerced name of "values" should be "value"
3462        assert_eq!(map_field.get_fields()[1].name(), "value");
3463
3464        // Double check schema after reading from the file
3465        let reader = SerializedFileReader::new(file).unwrap();
3466        let file_schema = reader.metadata().file_metadata().schema();
3467        let fields = file_schema.get_fields();
3468        let list_field = &fields[0].get_fields()[0];
3469        assert_eq!(list_field.get_fields()[0].name(), "element");
3470        let map_field = &fields[1].get_fields()[0];
3471        assert_eq!(map_field.name(), "key_value");
3472        assert_eq!(map_field.get_fields()[0].name(), "key");
3473        assert_eq!(map_field.get_fields()[1].name(), "value");
3474    }
3475
3476    #[test]
3477    fn fallback_flush_data_page() {
3478        //tests if the Fallback::flush_data_page clears all buffers correctly
3479        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3480        let values = Arc::new(StringArray::from(raw_values));
3481        let encodings = vec![
3482            Encoding::DELTA_BYTE_ARRAY,
3483            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3484        ];
3485        let data_type = values.data_type().clone();
3486        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3487        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3488
3489        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3490        let data_page_size_limit: usize = 32;
3491        let write_batch_size: usize = 16;
3492
3493        for encoding in &encodings {
3494            for row_group_size in row_group_sizes {
3495                let props = WriterProperties::builder()
3496                    .set_writer_version(WriterVersion::PARQUET_2_0)
3497                    .set_max_row_group_row_count(Some(row_group_size))
3498                    .set_dictionary_enabled(false)
3499                    .set_encoding(*encoding)
3500                    .set_data_page_size_limit(data_page_size_limit)
3501                    .set_write_batch_size(write_batch_size)
3502                    .build();
3503
3504                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3505                    let string_array_a = StringArray::from(a.clone());
3506                    let string_array_b = StringArray::from(b.clone());
3507                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3508                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3509                    assert_eq!(
3510                        vec_a, vec_b,
3511                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3512                    );
3513                });
3514            }
3515        }
3516    }
3517
3518    #[test]
3519    fn arrow_writer_string_dictionary() {
3520        // define schema
3521        #[allow(deprecated)]
3522        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3523            "dictionary",
3524            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3525            true,
3526            42,
3527            true,
3528        )]));
3529
3530        // create some data
3531        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3532            .iter()
3533            .copied()
3534            .collect();
3535
3536        // build a record batch
3537        one_column_roundtrip_with_schema(Arc::new(d), schema);
3538    }
3539
3540    #[test]
3541    fn arrow_writer_test_type_compatibility() {
3542        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3543        where
3544            T1: Array + 'static,
3545            T2: Array + 'static,
3546        {
3547            let schema1 = Arc::new(Schema::new(vec![Field::new(
3548                "a",
3549                array1.data_type().clone(),
3550                false,
3551            )]));
3552
3553            let file = tempfile().unwrap();
3554            let mut writer =
3555                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3556
3557            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3558            writer.write(&rb1).unwrap();
3559
3560            let schema2 = Arc::new(Schema::new(vec![Field::new(
3561                "a",
3562                array2.data_type().clone(),
3563                false,
3564            )]));
3565            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3566            writer.write(&rb2).unwrap();
3567
3568            writer.close().unwrap();
3569
3570            let mut record_batch_reader =
3571                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3572            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3573
3574            let expected_batch =
3575                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3576            assert_eq!(actual_batch, expected_batch);
3577        }
3578
3579        // check compatibility between native and dictionaries
3580
3581        ensure_compatible_write(
3582            DictionaryArray::new(
3583                UInt8Array::from_iter_values(vec![0]),
3584                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3585            ),
3586            StringArray::from_iter_values(vec!["barquet"]),
3587            DictionaryArray::new(
3588                UInt8Array::from_iter_values(vec![0, 1]),
3589                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3590            ),
3591        );
3592
3593        ensure_compatible_write(
3594            StringArray::from_iter_values(vec!["parquet"]),
3595            DictionaryArray::new(
3596                UInt8Array::from_iter_values(vec![0]),
3597                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3598            ),
3599            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3600        );
3601
3602        // check compatibility between dictionaries with different key types
3603
3604        ensure_compatible_write(
3605            DictionaryArray::new(
3606                UInt8Array::from_iter_values(vec![0]),
3607                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3608            ),
3609            DictionaryArray::new(
3610                UInt16Array::from_iter_values(vec![0]),
3611                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3612            ),
3613            DictionaryArray::new(
3614                UInt8Array::from_iter_values(vec![0, 1]),
3615                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3616            ),
3617        );
3618
3619        // check compatibility between dictionaries with different value types
3620        ensure_compatible_write(
3621            DictionaryArray::new(
3622                UInt8Array::from_iter_values(vec![0]),
3623                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3624            ),
3625            DictionaryArray::new(
3626                UInt8Array::from_iter_values(vec![0]),
3627                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3628            ),
3629            DictionaryArray::new(
3630                UInt8Array::from_iter_values(vec![0, 1]),
3631                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3632            ),
3633        );
3634
3635        // check compatibility between a dictionary and a native array with a different type
3636        ensure_compatible_write(
3637            DictionaryArray::new(
3638                UInt8Array::from_iter_values(vec![0]),
3639                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3640            ),
3641            LargeStringArray::from_iter_values(vec!["barquet"]),
3642            DictionaryArray::new(
3643                UInt8Array::from_iter_values(vec![0, 1]),
3644                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3645            ),
3646        );
3647
3648        // check compatibility for string types
3649
3650        ensure_compatible_write(
3651            StringArray::from_iter_values(vec!["parquet"]),
3652            LargeStringArray::from_iter_values(vec!["barquet"]),
3653            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3654        );
3655
3656        ensure_compatible_write(
3657            LargeStringArray::from_iter_values(vec!["parquet"]),
3658            StringArray::from_iter_values(vec!["barquet"]),
3659            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3660        );
3661
3662        ensure_compatible_write(
3663            StringArray::from_iter_values(vec!["parquet"]),
3664            StringViewArray::from_iter_values(vec!["barquet"]),
3665            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3666        );
3667
3668        ensure_compatible_write(
3669            StringViewArray::from_iter_values(vec!["parquet"]),
3670            StringArray::from_iter_values(vec!["barquet"]),
3671            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3672        );
3673
3674        ensure_compatible_write(
3675            LargeStringArray::from_iter_values(vec!["parquet"]),
3676            StringViewArray::from_iter_values(vec!["barquet"]),
3677            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3678        );
3679
3680        ensure_compatible_write(
3681            StringViewArray::from_iter_values(vec!["parquet"]),
3682            LargeStringArray::from_iter_values(vec!["barquet"]),
3683            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3684        );
3685
3686        // check compatibility for binary types
3687
3688        ensure_compatible_write(
3689            BinaryArray::from_iter_values(vec![b"parquet"]),
3690            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3691            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3692        );
3693
3694        ensure_compatible_write(
3695            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3696            BinaryArray::from_iter_values(vec![b"barquet"]),
3697            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3698        );
3699
3700        ensure_compatible_write(
3701            BinaryArray::from_iter_values(vec![b"parquet"]),
3702            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3703            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3704        );
3705
3706        ensure_compatible_write(
3707            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3708            BinaryArray::from_iter_values(vec![b"barquet"]),
3709            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3710        );
3711
3712        ensure_compatible_write(
3713            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3714            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3715            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3716        );
3717
3718        ensure_compatible_write(
3719            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3720            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3721            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3722        );
3723
3724        // check compatibility for list types
3725
3726        let list_field_metadata = HashMap::from_iter(vec![(
3727            PARQUET_FIELD_ID_META_KEY.to_string(),
3728            "1".to_string(),
3729        )]);
3730        let list_field = Field::new_list_field(DataType::Int32, false);
3731
3732        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3733        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3734
3735        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3736        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3737
3738        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3739        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3740
3741        ensure_compatible_write(
3742            // when the initial schema has the metadata ...
3743            ListArray::try_new(
3744                Arc::new(
3745                    list_field
3746                        .clone()
3747                        .with_metadata(list_field_metadata.clone()),
3748                ),
3749                offsets1,
3750                values1,
3751                None,
3752            )
3753            .unwrap(),
3754            // ... and some intermediate schema doesn't have the metadata
3755            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3756            // ... the write will still go through, and the resulting schema will inherit the initial metadata
3757            ListArray::try_new(
3758                Arc::new(
3759                    list_field
3760                        .clone()
3761                        .with_metadata(list_field_metadata.clone()),
3762                ),
3763                offsets_expected,
3764                values_expected,
3765                None,
3766            )
3767            .unwrap(),
3768        );
3769    }
3770
3771    #[test]
3772    fn arrow_writer_primitive_dictionary() {
3773        // define schema
3774        #[allow(deprecated)]
3775        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3776            "dictionary",
3777            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3778            true,
3779            42,
3780            true,
3781        )]));
3782
3783        // create some data
3784        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3785        builder.append(12345678).unwrap();
3786        builder.append_null();
3787        builder.append(22345678).unwrap();
3788        builder.append(12345678).unwrap();
3789        let d = builder.finish();
3790
3791        one_column_roundtrip_with_schema(Arc::new(d), schema);
3792    }
3793
3794    #[test]
3795    fn arrow_writer_decimal32_dictionary() {
3796        let integers = vec![12345, 56789, 34567];
3797
3798        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3799
3800        let values = Decimal32Array::from(integers.clone())
3801            .with_precision_and_scale(5, 2)
3802            .unwrap();
3803
3804        let array = DictionaryArray::new(keys, Arc::new(values));
3805        one_column_roundtrip(Arc::new(array.clone()), true);
3806
3807        let values = Decimal32Array::from(integers)
3808            .with_precision_and_scale(9, 2)
3809            .unwrap();
3810
3811        let array = array.with_values(Arc::new(values));
3812        one_column_roundtrip(Arc::new(array), true);
3813    }
3814
3815    #[test]
3816    fn arrow_writer_decimal64_dictionary() {
3817        let integers = vec![12345, 56789, 34567];
3818
3819        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3820
3821        let values = Decimal64Array::from(integers.clone())
3822            .with_precision_and_scale(5, 2)
3823            .unwrap();
3824
3825        let array = DictionaryArray::new(keys, Arc::new(values));
3826        one_column_roundtrip(Arc::new(array.clone()), true);
3827
3828        let values = Decimal64Array::from(integers)
3829            .with_precision_and_scale(12, 2)
3830            .unwrap();
3831
3832        let array = array.with_values(Arc::new(values));
3833        one_column_roundtrip(Arc::new(array), true);
3834    }
3835
3836    #[test]
3837    fn arrow_writer_decimal128_dictionary() {
3838        let integers = vec![12345, 56789, 34567];
3839
3840        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3841
3842        let values = Decimal128Array::from(integers.clone())
3843            .with_precision_and_scale(5, 2)
3844            .unwrap();
3845
3846        let array = DictionaryArray::new(keys, Arc::new(values));
3847        one_column_roundtrip(Arc::new(array.clone()), true);
3848
3849        let values = Decimal128Array::from(integers)
3850            .with_precision_and_scale(12, 2)
3851            .unwrap();
3852
3853        let array = array.with_values(Arc::new(values));
3854        one_column_roundtrip(Arc::new(array), true);
3855    }
3856
3857    #[test]
3858    fn arrow_writer_decimal256_dictionary() {
3859        let integers = vec![
3860            i256::from_i128(12345),
3861            i256::from_i128(56789),
3862            i256::from_i128(34567),
3863        ];
3864
3865        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3866
3867        let values = Decimal256Array::from(integers.clone())
3868            .with_precision_and_scale(5, 2)
3869            .unwrap();
3870
3871        let array = DictionaryArray::new(keys, Arc::new(values));
3872        one_column_roundtrip(Arc::new(array.clone()), true);
3873
3874        let values = Decimal256Array::from(integers)
3875            .with_precision_and_scale(12, 2)
3876            .unwrap();
3877
3878        let array = array.with_values(Arc::new(values));
3879        one_column_roundtrip(Arc::new(array), true);
3880    }
3881
3882    #[test]
3883    fn arrow_writer_string_dictionary_unsigned_index() {
3884        // define schema
3885        #[allow(deprecated)]
3886        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3887            "dictionary",
3888            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3889            true,
3890            42,
3891            true,
3892        )]));
3893
3894        // create some data
3895        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3896            .iter()
3897            .copied()
3898            .collect();
3899
3900        one_column_roundtrip_with_schema(Arc::new(d), schema);
3901    }
3902
3903    #[test]
3904    fn u32_min_max() {
3905        // check values roundtrip through parquet
3906        let src = [
3907            u32::MIN,
3908            u32::MIN + 1,
3909            (i32::MAX as u32) - 1,
3910            i32::MAX as u32,
3911            (i32::MAX as u32) + 1,
3912            u32::MAX - 1,
3913            u32::MAX,
3914        ];
3915        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3916        let files = one_column_roundtrip(values, false);
3917
3918        for file in files {
3919            // check statistics are valid
3920            let reader = SerializedFileReader::new(file).unwrap();
3921            let metadata = reader.metadata();
3922
3923            let mut row_offset = 0;
3924            for row_group in metadata.row_groups() {
3925                assert_eq!(row_group.num_columns(), 1);
3926                let column = row_group.column(0);
3927
3928                let num_values = column.num_values() as usize;
3929                let src_slice = &src[row_offset..row_offset + num_values];
3930                row_offset += column.num_values() as usize;
3931
3932                let stats = column.statistics().unwrap();
3933                if let Statistics::Int32(stats) = stats {
3934                    assert_eq!(
3935                        *stats.min_opt().unwrap() as u32,
3936                        *src_slice.iter().min().unwrap()
3937                    );
3938                    assert_eq!(
3939                        *stats.max_opt().unwrap() as u32,
3940                        *src_slice.iter().max().unwrap()
3941                    );
3942                } else {
3943                    panic!("Statistics::Int32 missing")
3944                }
3945            }
3946        }
3947    }
3948
3949    #[test]
3950    fn u64_min_max() {
3951        // check values roundtrip through parquet
3952        let src = [
3953            u64::MIN,
3954            u64::MIN + 1,
3955            (i64::MAX as u64) - 1,
3956            i64::MAX as u64,
3957            (i64::MAX as u64) + 1,
3958            u64::MAX - 1,
3959            u64::MAX,
3960        ];
3961        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3962        let files = one_column_roundtrip(values, false);
3963
3964        for file in files {
3965            // check statistics are valid
3966            let reader = SerializedFileReader::new(file).unwrap();
3967            let metadata = reader.metadata();
3968
3969            let mut row_offset = 0;
3970            for row_group in metadata.row_groups() {
3971                assert_eq!(row_group.num_columns(), 1);
3972                let column = row_group.column(0);
3973
3974                let num_values = column.num_values() as usize;
3975                let src_slice = &src[row_offset..row_offset + num_values];
3976                row_offset += column.num_values() as usize;
3977
3978                let stats = column.statistics().unwrap();
3979                if let Statistics::Int64(stats) = stats {
3980                    assert_eq!(
3981                        *stats.min_opt().unwrap() as u64,
3982                        *src_slice.iter().min().unwrap()
3983                    );
3984                    assert_eq!(
3985                        *stats.max_opt().unwrap() as u64,
3986                        *src_slice.iter().max().unwrap()
3987                    );
3988                } else {
3989                    panic!("Statistics::Int64 missing")
3990                }
3991            }
3992        }
3993    }
3994
3995    #[test]
3996    fn statistics_null_counts_only_nulls() {
3997        // check that null-count statistics for "only NULL"-columns are correct
3998        let values = Arc::new(UInt64Array::from(vec![None, None]));
3999        let files = one_column_roundtrip(values, true);
4000
4001        for file in files {
4002            // check statistics are valid
4003            let reader = SerializedFileReader::new(file).unwrap();
4004            let metadata = reader.metadata();
4005            assert_eq!(metadata.num_row_groups(), 1);
4006            let row_group = metadata.row_group(0);
4007            assert_eq!(row_group.num_columns(), 1);
4008            let column = row_group.column(0);
4009            let stats = column.statistics().unwrap();
4010            assert_eq!(stats.null_count_opt(), Some(2));
4011        }
4012    }
4013
4014    #[test]
4015    fn test_list_of_struct_roundtrip() {
4016        // define schema
4017        let int_field = Field::new("a", DataType::Int32, true);
4018        let int_field2 = Field::new("b", DataType::Int32, true);
4019
4020        let int_builder = Int32Builder::with_capacity(10);
4021        let int_builder2 = Int32Builder::with_capacity(10);
4022
4023        let struct_builder = StructBuilder::new(
4024            vec![int_field, int_field2],
4025            vec![Box::new(int_builder), Box::new(int_builder2)],
4026        );
4027        let mut list_builder = ListBuilder::new(struct_builder);
4028
4029        // Construct the following array
4030        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
4031
4032        // [{a: 1, b: 2}]
4033        let values = list_builder.values();
4034        values
4035            .field_builder::<Int32Builder>(0)
4036            .unwrap()
4037            .append_value(1);
4038        values
4039            .field_builder::<Int32Builder>(1)
4040            .unwrap()
4041            .append_value(2);
4042        values.append(true);
4043        list_builder.append(true);
4044
4045        // []
4046        list_builder.append(true);
4047
4048        // null
4049        list_builder.append(false);
4050
4051        // [null, null]
4052        let values = list_builder.values();
4053        values
4054            .field_builder::<Int32Builder>(0)
4055            .unwrap()
4056            .append_null();
4057        values
4058            .field_builder::<Int32Builder>(1)
4059            .unwrap()
4060            .append_null();
4061        values.append(false);
4062        values
4063            .field_builder::<Int32Builder>(0)
4064            .unwrap()
4065            .append_null();
4066        values
4067            .field_builder::<Int32Builder>(1)
4068            .unwrap()
4069            .append_null();
4070        values.append(false);
4071        list_builder.append(true);
4072
4073        // [{a: null, b: 3}]
4074        let values = list_builder.values();
4075        values
4076            .field_builder::<Int32Builder>(0)
4077            .unwrap()
4078            .append_null();
4079        values
4080            .field_builder::<Int32Builder>(1)
4081            .unwrap()
4082            .append_value(3);
4083        values.append(true);
4084        list_builder.append(true);
4085
4086        // [{a: 2, b: null}]
4087        let values = list_builder.values();
4088        values
4089            .field_builder::<Int32Builder>(0)
4090            .unwrap()
4091            .append_value(2);
4092        values
4093            .field_builder::<Int32Builder>(1)
4094            .unwrap()
4095            .append_null();
4096        values.append(true);
4097        list_builder.append(true);
4098
4099        let array = Arc::new(list_builder.finish());
4100
4101        one_column_roundtrip(array, true);
4102    }
4103
4104    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4105        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4106    }
4107
4108    #[test]
4109    fn test_aggregates_records() {
4110        let arrays = [
4111            Int32Array::from((0..100).collect::<Vec<_>>()),
4112            Int32Array::from((0..50).collect::<Vec<_>>()),
4113            Int32Array::from((200..500).collect::<Vec<_>>()),
4114        ];
4115
4116        let schema = Arc::new(Schema::new(vec![Field::new(
4117            "int",
4118            ArrowDataType::Int32,
4119            false,
4120        )]));
4121
4122        let file = tempfile::tempfile().unwrap();
4123
4124        let props = WriterProperties::builder()
4125            .set_max_row_group_row_count(Some(200))
4126            .build();
4127
4128        let mut writer =
4129            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4130
4131        for array in arrays {
4132            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4133            writer.write(&batch).unwrap();
4134        }
4135
4136        writer.close().unwrap();
4137
4138        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4139        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4140
4141        let batches = builder
4142            .with_batch_size(100)
4143            .build()
4144            .unwrap()
4145            .collect::<ArrowResult<Vec<_>>>()
4146            .unwrap();
4147
4148        assert_eq!(batches.len(), 5);
4149        assert!(batches.iter().all(|x| x.num_columns() == 1));
4150
4151        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4152
4153        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4154
4155        let values: Vec<_> = batches
4156            .iter()
4157            .flat_map(|x| {
4158                x.column(0)
4159                    .as_any()
4160                    .downcast_ref::<Int32Array>()
4161                    .unwrap()
4162                    .values()
4163                    .iter()
4164                    .cloned()
4165            })
4166            .collect();
4167
4168        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4169        assert_eq!(&values, &expected_values)
4170    }
4171
4172    #[test]
4173    fn complex_aggregate() {
4174        // Tests aggregating nested data
4175        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4176        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4177        let struct_a = Arc::new(Field::new(
4178            "struct_a",
4179            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4180            true,
4181        ));
4182
4183        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4184        let struct_b = Arc::new(Field::new(
4185            "struct_b",
4186            DataType::Struct(vec![list_a.clone()].into()),
4187            false,
4188        ));
4189
4190        let schema = Arc::new(Schema::new(vec![struct_b]));
4191
4192        // create nested data
4193        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4194        let field_b_array =
4195            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4196
4197        let struct_a_array = StructArray::from(vec![
4198            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4199            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4200        ]);
4201
4202        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4203            .len(5)
4204            .add_buffer(Buffer::from_iter(vec![
4205                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4206            ]))
4207            .null_bit_buffer(Some(Buffer::from_iter(vec![
4208                true, false, true, false, true,
4209            ])))
4210            .child_data(vec![struct_a_array.into_data()])
4211            .build()
4212            .unwrap();
4213
4214        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4215        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4216
4217        let batch1 =
4218            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4219                .unwrap();
4220
4221        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4222        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4223
4224        let struct_a_array = StructArray::from(vec![
4225            (field_a, Arc::new(field_a_array) as ArrayRef),
4226            (field_b, Arc::new(field_b_array) as ArrayRef),
4227        ]);
4228
4229        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4230            .len(2)
4231            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4232            .child_data(vec![struct_a_array.into_data()])
4233            .build()
4234            .unwrap();
4235
4236        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4237        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4238
4239        let batch2 =
4240            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4241                .unwrap();
4242
4243        let batches = &[batch1, batch2];
4244
4245        // Verify data is as expected
4246
4247        let expected = r#"
4248            +-------------------------------------------------------------------------------------------------------+
4249            | struct_b                                                                                              |
4250            +-------------------------------------------------------------------------------------------------------+
4251            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
4252            | {list: }                                                                                              |
4253            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
4254            | {list: }                                                                                              |
4255            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
4256            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4257            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
4258            +-------------------------------------------------------------------------------------------------------+
4259        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4260
4261        let actual = pretty_format_batches(batches).unwrap().to_string();
4262        assert_eq!(actual, expected);
4263
4264        // Write data
4265        let file = tempfile::tempfile().unwrap();
4266        let props = WriterProperties::builder()
4267            .set_max_row_group_row_count(Some(6))
4268            .build();
4269
4270        let mut writer =
4271            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4272
4273        for batch in batches {
4274            writer.write(batch).unwrap();
4275        }
4276        writer.close().unwrap();
4277
4278        // Read Data
4279        // Should have written entire first batch and first row of second to the first row group
4280        // leaving a single row in the second row group
4281
4282        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4283        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4284
4285        let batches = builder
4286            .with_batch_size(2)
4287            .build()
4288            .unwrap()
4289            .collect::<ArrowResult<Vec<_>>>()
4290            .unwrap();
4291
4292        assert_eq!(batches.len(), 4);
4293        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4294        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4295
4296        let actual = pretty_format_batches(&batches).unwrap().to_string();
4297        assert_eq!(actual, expected);
4298    }
4299
4300    #[test]
4301    fn test_arrow_writer_metadata() {
4302        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4303        let file_schema = batch_schema.clone().with_metadata(
4304            vec![("foo".to_string(), "bar".to_string())]
4305                .into_iter()
4306                .collect(),
4307        );
4308
4309        let batch = RecordBatch::try_new(
4310            Arc::new(batch_schema),
4311            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4312        )
4313        .unwrap();
4314
4315        let mut buf = Vec::with_capacity(1024);
4316        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4317        writer.write(&batch).unwrap();
4318        writer.close().unwrap();
4319    }
4320
4321    #[test]
4322    fn test_arrow_writer_nullable() {
4323        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4324        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4325        let file_schema = Arc::new(file_schema);
4326
4327        let batch = RecordBatch::try_new(
4328            Arc::new(batch_schema),
4329            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4330        )
4331        .unwrap();
4332
4333        let mut buf = Vec::with_capacity(1024);
4334        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4335        writer.write(&batch).unwrap();
4336        writer.close().unwrap();
4337
4338        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4339        let back = read.next().unwrap().unwrap();
4340        assert_eq!(back.schema(), file_schema);
4341        assert_ne!(back.schema(), batch.schema());
4342        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4343    }
4344
4345    #[test]
4346    fn in_progress_accounting() {
4347        // define schema
4348        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4349
4350        // create some data
4351        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4352
4353        // build a record batch
4354        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4355
4356        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4357
4358        // starts empty
4359        assert_eq!(writer.in_progress_size(), 0);
4360        assert_eq!(writer.in_progress_rows(), 0);
4361        assert_eq!(writer.memory_size(), 0);
4362        assert_eq!(writer.bytes_written(), 4); // Initial header
4363        writer.write(&batch).unwrap();
4364
4365        // updated on write
4366        let initial_size = writer.in_progress_size();
4367        assert!(initial_size > 0);
4368        assert_eq!(writer.in_progress_rows(), 5);
4369        let initial_memory = writer.memory_size();
4370        assert!(initial_memory > 0);
4371        // memory estimate is larger than estimated encoded size
4372        assert!(
4373            initial_size <= initial_memory,
4374            "{initial_size} <= {initial_memory}"
4375        );
4376
4377        // updated on second write
4378        writer.write(&batch).unwrap();
4379        assert!(writer.in_progress_size() > initial_size);
4380        assert_eq!(writer.in_progress_rows(), 10);
4381        assert!(writer.memory_size() > initial_memory);
4382        assert!(
4383            writer.in_progress_size() <= writer.memory_size(),
4384            "in_progress_size {} <= memory_size {}",
4385            writer.in_progress_size(),
4386            writer.memory_size()
4387        );
4388
4389        // in progress tracking is cleared, but the overall data written is updated
4390        let pre_flush_bytes_written = writer.bytes_written();
4391        writer.flush().unwrap();
4392        assert_eq!(writer.in_progress_size(), 0);
4393        assert_eq!(writer.memory_size(), 0);
4394        assert!(writer.bytes_written() > pre_flush_bytes_written);
4395
4396        writer.close().unwrap();
4397    }
4398
4399    #[test]
4400    fn test_writer_all_null() {
4401        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4402        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4403        let batch = RecordBatch::try_from_iter(vec![
4404            ("a", Arc::new(a) as ArrayRef),
4405            ("b", Arc::new(b) as ArrayRef),
4406        ])
4407        .unwrap();
4408
4409        let mut buf = Vec::with_capacity(1024);
4410        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4411        writer.write(&batch).unwrap();
4412        writer.close().unwrap();
4413
4414        let bytes = Bytes::from(buf);
4415        let options = ReadOptionsBuilder::new().with_page_index().build();
4416        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4417        let index = reader.metadata().offset_index().unwrap();
4418
4419        assert_eq!(index.len(), 1);
4420        assert_eq!(index[0].len(), 2); // 2 columns
4421        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4422        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4423    }
4424
4425    #[test]
4426    fn test_disabled_statistics_with_page() {
4427        let file_schema = Schema::new(vec![
4428            Field::new("a", DataType::Utf8, true),
4429            Field::new("b", DataType::Utf8, true),
4430        ]);
4431        let file_schema = Arc::new(file_schema);
4432
4433        let batch = RecordBatch::try_new(
4434            file_schema.clone(),
4435            vec![
4436                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4437                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4438            ],
4439        )
4440        .unwrap();
4441
4442        let props = WriterProperties::builder()
4443            .set_statistics_enabled(EnabledStatistics::None)
4444            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4445            .build();
4446
4447        let mut buf = Vec::with_capacity(1024);
4448        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4449        writer.write(&batch).unwrap();
4450
4451        let metadata = writer.close().unwrap();
4452        assert_eq!(metadata.num_row_groups(), 1);
4453        let row_group = metadata.row_group(0);
4454        assert_eq!(row_group.num_columns(), 2);
4455        // Column "a" has both offset and column index, as requested
4456        assert!(row_group.column(0).offset_index_offset().is_some());
4457        assert!(row_group.column(0).column_index_offset().is_some());
4458        // Column "b" should only have offset index
4459        assert!(row_group.column(1).offset_index_offset().is_some());
4460        assert!(row_group.column(1).column_index_offset().is_none());
4461
4462        let options = ReadOptionsBuilder::new().with_page_index().build();
4463        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4464
4465        let row_group = reader.get_row_group(0).unwrap();
4466        let a_col = row_group.metadata().column(0);
4467        let b_col = row_group.metadata().column(1);
4468
4469        // Column chunk of column "a" should have chunk level statistics
4470        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4471            let min = byte_array_stats.min_opt().unwrap();
4472            let max = byte_array_stats.max_opt().unwrap();
4473
4474            assert_eq!(min.as_bytes(), b"a");
4475            assert_eq!(max.as_bytes(), b"d");
4476        } else {
4477            panic!("expecting Statistics::ByteArray");
4478        }
4479
4480        // The column chunk for column "b" shouldn't have statistics
4481        assert!(b_col.statistics().is_none());
4482
4483        let offset_index = reader.metadata().offset_index().unwrap();
4484        assert_eq!(offset_index.len(), 1); // 1 row group
4485        assert_eq!(offset_index[0].len(), 2); // 2 columns
4486
4487        let column_index = reader.metadata().column_index().unwrap();
4488        assert_eq!(column_index.len(), 1); // 1 row group
4489        assert_eq!(column_index[0].len(), 2); // 2 columns
4490
4491        let a_idx = &column_index[0][0];
4492        assert!(
4493            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4494            "{a_idx:?}"
4495        );
4496        let b_idx = &column_index[0][1];
4497        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4498    }
4499
4500    #[test]
4501    fn test_disabled_statistics_with_chunk() {
4502        let file_schema = Schema::new(vec![
4503            Field::new("a", DataType::Utf8, true),
4504            Field::new("b", DataType::Utf8, true),
4505        ]);
4506        let file_schema = Arc::new(file_schema);
4507
4508        let batch = RecordBatch::try_new(
4509            file_schema.clone(),
4510            vec![
4511                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4512                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4513            ],
4514        )
4515        .unwrap();
4516
4517        let props = WriterProperties::builder()
4518            .set_statistics_enabled(EnabledStatistics::None)
4519            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4520            .build();
4521
4522        let mut buf = Vec::with_capacity(1024);
4523        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4524        writer.write(&batch).unwrap();
4525
4526        let metadata = writer.close().unwrap();
4527        assert_eq!(metadata.num_row_groups(), 1);
4528        let row_group = metadata.row_group(0);
4529        assert_eq!(row_group.num_columns(), 2);
4530        // Column "a" should only have offset index
4531        assert!(row_group.column(0).offset_index_offset().is_some());
4532        assert!(row_group.column(0).column_index_offset().is_none());
4533        // Column "b" should only have offset index
4534        assert!(row_group.column(1).offset_index_offset().is_some());
4535        assert!(row_group.column(1).column_index_offset().is_none());
4536
4537        let options = ReadOptionsBuilder::new().with_page_index().build();
4538        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4539
4540        let row_group = reader.get_row_group(0).unwrap();
4541        let a_col = row_group.metadata().column(0);
4542        let b_col = row_group.metadata().column(1);
4543
4544        // Column chunk of column "a" should have chunk level statistics
4545        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4546            let min = byte_array_stats.min_opt().unwrap();
4547            let max = byte_array_stats.max_opt().unwrap();
4548
4549            assert_eq!(min.as_bytes(), b"a");
4550            assert_eq!(max.as_bytes(), b"d");
4551        } else {
4552            panic!("expecting Statistics::ByteArray");
4553        }
4554
4555        // The column chunk for column "b"  shouldn't have statistics
4556        assert!(b_col.statistics().is_none());
4557
4558        let column_index = reader.metadata().column_index().unwrap();
4559        assert_eq!(column_index.len(), 1); // 1 row group
4560        assert_eq!(column_index[0].len(), 2); // 2 columns
4561
4562        let a_idx = &column_index[0][0];
4563        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4564        let b_idx = &column_index[0][1];
4565        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4566    }
4567
4568    #[test]
4569    fn test_arrow_writer_skip_metadata() {
4570        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4571        let file_schema = Arc::new(batch_schema.clone());
4572
4573        let batch = RecordBatch::try_new(
4574            Arc::new(batch_schema),
4575            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4576        )
4577        .unwrap();
4578        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4579
4580        let mut buf = Vec::with_capacity(1024);
4581        let mut writer =
4582            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4583        writer.write(&batch).unwrap();
4584        writer.close().unwrap();
4585
4586        let bytes = Bytes::from(buf);
4587        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4588        assert_eq!(file_schema, *reader_builder.schema());
4589        if let Some(key_value_metadata) = reader_builder
4590            .metadata()
4591            .file_metadata()
4592            .key_value_metadata()
4593        {
4594            assert!(
4595                !key_value_metadata
4596                    .iter()
4597                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4598            );
4599        }
4600    }
4601
4602    #[test]
4603    fn test_arrow_writer_skip_path_in_schema() {
4604        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4605        let file_schema = Arc::new(batch_schema.clone());
4606
4607        let batch = RecordBatch::try_new(
4608            Arc::new(batch_schema),
4609            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4610        )
4611        .unwrap();
4612
4613        // default options should still write path_in_schema
4614        let skip_options = ArrowWriterOptions::new();
4615
4616        let mut buf = Vec::with_capacity(1024);
4617        let mut writer =
4618            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4619        writer.write(&batch).unwrap();
4620        writer.close().unwrap();
4621
4622        // override to not write path_in_schema
4623        let skip_options = ArrowWriterOptions::new().with_properties(
4624            WriterProperties::builder()
4625                .set_write_path_in_schema(false)
4626                .build(),
4627        );
4628
4629        let mut buf2 = Vec::with_capacity(1024);
4630        let mut writer =
4631            ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
4632                .unwrap();
4633        writer.write(&batch).unwrap();
4634        writer.close().unwrap();
4635
4636        // buf2 should be a bit smaller due to lack of path_in_schema
4637        assert!(buf.len() > buf2.len());
4638    }
4639
4640    #[test]
4641    fn mismatched_schemas() {
4642        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4643        let file_schema = Arc::new(Schema::new(vec![Field::new(
4644            "temperature",
4645            DataType::Float64,
4646            false,
4647        )]));
4648
4649        let batch = RecordBatch::try_new(
4650            Arc::new(batch_schema),
4651            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4652        )
4653        .unwrap();
4654
4655        let mut buf = Vec::with_capacity(1024);
4656        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4657
4658        let err = writer.write(&batch).unwrap_err().to_string();
4659        assert_eq!(
4660            err,
4661            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4662        );
4663    }
4664
4665    #[test]
4666    // https://github.com/apache/arrow-rs/issues/6988
4667    fn test_roundtrip_empty_schema() {
4668        // create empty record batch with empty schema
4669        let empty_batch = RecordBatch::try_new_with_options(
4670            Arc::new(Schema::empty()),
4671            vec![],
4672            &RecordBatchOptions::default().with_row_count(Some(0)),
4673        )
4674        .unwrap();
4675
4676        // write to parquet
4677        let mut parquet_bytes: Vec<u8> = Vec::new();
4678        let mut writer =
4679            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4680        writer.write(&empty_batch).unwrap();
4681        writer.close().unwrap();
4682
4683        // read from parquet
4684        let bytes = Bytes::from(parquet_bytes);
4685        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4686        assert_eq!(reader.schema(), &empty_batch.schema());
4687        let batches: Vec<_> = reader
4688            .build()
4689            .unwrap()
4690            .collect::<ArrowResult<Vec<_>>>()
4691            .unwrap();
4692        assert_eq!(batches.len(), 0);
4693    }
4694
4695    #[test]
4696    fn test_page_stats_not_written_by_default() {
4697        let string_field = Field::new("a", DataType::Utf8, false);
4698        let schema = Schema::new(vec![string_field]);
4699        let raw_string_values = vec!["Blart Versenwald III"];
4700        let string_values = StringArray::from(raw_string_values.clone());
4701        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4702
4703        let props = WriterProperties::builder()
4704            .set_statistics_enabled(EnabledStatistics::Page)
4705            .set_dictionary_enabled(false)
4706            .set_encoding(Encoding::PLAIN)
4707            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4708            .build();
4709
4710        let file = roundtrip_opts(&batch, props);
4711
4712        // read file and decode page headers
4713        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4714
4715        // decode first page header
4716        let first_page = &file[4..];
4717        let mut prot = ThriftSliceInputProtocol::new(first_page);
4718        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4719        let stats = hdr.data_page_header.unwrap().statistics;
4720
4721        assert!(stats.is_none());
4722    }
4723
4724    #[test]
4725    fn test_page_stats_when_enabled() {
4726        let string_field = Field::new("a", DataType::Utf8, false);
4727        let schema = Schema::new(vec![string_field]);
4728        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4729        let string_values = StringArray::from(raw_string_values.clone());
4730        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4731
4732        let props = WriterProperties::builder()
4733            .set_statistics_enabled(EnabledStatistics::Page)
4734            .set_dictionary_enabled(false)
4735            .set_encoding(Encoding::PLAIN)
4736            .set_write_page_header_statistics(true)
4737            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4738            .build();
4739
4740        let file = roundtrip_opts(&batch, props);
4741
4742        // read file and decode page headers
4743        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4744
4745        // decode first page header
4746        let first_page = &file[4..];
4747        let mut prot = ThriftSliceInputProtocol::new(first_page);
4748        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4749        let stats = hdr.data_page_header.unwrap().statistics;
4750
4751        let stats = stats.unwrap();
4752        // check that min/max were actually written to the page
4753        assert!(stats.is_max_value_exact.unwrap());
4754        assert!(stats.is_min_value_exact.unwrap());
4755        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4756        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4757    }
4758
4759    #[test]
4760    fn test_page_stats_truncation() {
4761        let string_field = Field::new("a", DataType::Utf8, false);
4762        let binary_field = Field::new("b", DataType::Binary, false);
4763        let schema = Schema::new(vec![string_field, binary_field]);
4764
4765        let raw_string_values = vec!["Blart Versenwald III"];
4766        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4767        let raw_binary_value_refs = raw_binary_values
4768            .iter()
4769            .map(|x| x.as_slice())
4770            .collect::<Vec<_>>();
4771
4772        let string_values = StringArray::from(raw_string_values.clone());
4773        let binary_values = BinaryArray::from(raw_binary_value_refs);
4774        let batch = RecordBatch::try_new(
4775            Arc::new(schema),
4776            vec![Arc::new(string_values), Arc::new(binary_values)],
4777        )
4778        .unwrap();
4779
4780        let props = WriterProperties::builder()
4781            .set_statistics_truncate_length(Some(2))
4782            .set_dictionary_enabled(false)
4783            .set_encoding(Encoding::PLAIN)
4784            .set_write_page_header_statistics(true)
4785            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4786            .build();
4787
4788        let file = roundtrip_opts(&batch, props);
4789
4790        // read file and decode page headers
4791        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4792
4793        // decode first page header
4794        let first_page = &file[4..];
4795        let mut prot = ThriftSliceInputProtocol::new(first_page);
4796        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4797        let stats = hdr.data_page_header.unwrap().statistics;
4798        assert!(stats.is_some());
4799        let stats = stats.unwrap();
4800        // check that min/max were properly truncated
4801        assert!(!stats.is_max_value_exact.unwrap());
4802        assert!(!stats.is_min_value_exact.unwrap());
4803        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4804        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4805
4806        // check second page now
4807        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4808        let mut prot = ThriftSliceInputProtocol::new(second_page);
4809        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4810        let stats = hdr.data_page_header.unwrap().statistics;
4811        assert!(stats.is_some());
4812        let stats = stats.unwrap();
4813        // check that min/max were properly truncated
4814        assert!(!stats.is_max_value_exact.unwrap());
4815        assert!(!stats.is_min_value_exact.unwrap());
4816        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4817        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4818    }
4819
4820    #[test]
4821    fn test_page_encoding_statistics_roundtrip() {
4822        let batch_schema = Schema::new(vec![Field::new(
4823            "int32",
4824            arrow_schema::DataType::Int32,
4825            false,
4826        )]);
4827
4828        let batch = RecordBatch::try_new(
4829            Arc::new(batch_schema.clone()),
4830            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4831        )
4832        .unwrap();
4833
4834        let mut file: File = tempfile::tempfile().unwrap();
4835        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4836        writer.write(&batch).unwrap();
4837        let file_metadata = writer.close().unwrap();
4838
4839        assert_eq!(file_metadata.num_row_groups(), 1);
4840        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4841        assert!(
4842            file_metadata
4843                .row_group(0)
4844                .column(0)
4845                .page_encoding_stats()
4846                .is_some()
4847        );
4848        let chunk_page_stats = file_metadata
4849            .row_group(0)
4850            .column(0)
4851            .page_encoding_stats()
4852            .unwrap();
4853
4854        // check that the read metadata is also correct
4855        let options = ReadOptionsBuilder::new()
4856            .with_page_index()
4857            .with_encoding_stats_as_mask(false)
4858            .build();
4859        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4860
4861        let rowgroup = reader.get_row_group(0).expect("row group missing");
4862        assert_eq!(rowgroup.num_columns(), 1);
4863        let column = rowgroup.metadata().column(0);
4864        assert!(column.page_encoding_stats().is_some());
4865        let file_page_stats = column.page_encoding_stats().unwrap();
4866        assert_eq!(chunk_page_stats, file_page_stats);
4867    }
4868
4869    #[test]
4870    fn test_different_dict_page_size_limit() {
4871        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4872        let schema = Arc::new(Schema::new(vec![
4873            Field::new("col0", arrow_schema::DataType::Int64, false),
4874            Field::new("col1", arrow_schema::DataType::Int64, false),
4875        ]));
4876        let batch =
4877            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4878
4879        let props = WriterProperties::builder()
4880            .set_dictionary_page_size_limit(1024 * 1024)
4881            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4882            .build();
4883        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4884        writer.write(&batch).unwrap();
4885        let data = Bytes::from(writer.into_inner().unwrap());
4886
4887        let mut metadata = ParquetMetaDataReader::new();
4888        metadata.try_parse(&data).unwrap();
4889        let metadata = metadata.finish().unwrap();
4890        let col0_meta = metadata.row_group(0).column(0);
4891        let col1_meta = metadata.row_group(0).column(1);
4892
4893        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4894            let mut reader =
4895                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4896            let page = reader.get_next_page().unwrap().unwrap();
4897            match page {
4898                Page::DictionaryPage { buf, .. } => buf.len(),
4899                _ => panic!("expected DictionaryPage"),
4900            }
4901        };
4902
4903        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4904        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4905    }
4906
4907    struct WriteBatchesShape {
4908        num_batches: usize,
4909        rows_per_batch: usize,
4910        row_size: usize,
4911    }
4912
4913    /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter`
4914    fn write_batches(
4915        WriteBatchesShape {
4916            num_batches,
4917            rows_per_batch,
4918            row_size,
4919        }: WriteBatchesShape,
4920        props: WriterProperties,
4921    ) -> ParquetRecordBatchReaderBuilder<File> {
4922        let schema = Arc::new(Schema::new(vec![Field::new(
4923            "str",
4924            ArrowDataType::Utf8,
4925            false,
4926        )]));
4927        let file = tempfile::tempfile().unwrap();
4928        let mut writer =
4929            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4930
4931        for batch_idx in 0..num_batches {
4932            let strings: Vec<String> = (0..rows_per_batch)
4933                .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
4934                .collect();
4935            let array = StringArray::from(strings);
4936            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4937            writer.write(&batch).unwrap();
4938        }
4939        writer.close().unwrap();
4940        ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
4941    }
4942
4943    #[test]
4944    // When both limits are None, all data should go into a single row group
4945    fn test_row_group_limit_none_writes_single_row_group() {
4946        let props = WriterProperties::builder()
4947            .set_max_row_group_row_count(None)
4948            .set_max_row_group_bytes(None)
4949            .build();
4950
4951        let builder = write_batches(
4952            WriteBatchesShape {
4953                num_batches: 1,
4954                rows_per_batch: 1000,
4955                row_size: 4,
4956            },
4957            props,
4958        );
4959
4960        assert_eq!(
4961            &row_group_sizes(builder.metadata()),
4962            &[1000],
4963            "With no limits, all rows should be in a single row group"
4964        );
4965    }
4966
4967    #[test]
4968    // When only max_row_group_size is set, respect the row limit
4969    fn test_row_group_limit_rows_only() {
4970        let props = WriterProperties::builder()
4971            .set_max_row_group_row_count(Some(300))
4972            .set_max_row_group_bytes(None)
4973            .build();
4974
4975        let builder = write_batches(
4976            WriteBatchesShape {
4977                num_batches: 1,
4978                rows_per_batch: 1000,
4979                row_size: 4,
4980            },
4981            props,
4982        );
4983
4984        assert_eq!(
4985            &row_group_sizes(builder.metadata()),
4986            &[300, 300, 300, 100],
4987            "Row groups should be split by row count"
4988        );
4989    }
4990
4991    #[test]
4992    // When only max_row_group_bytes is set, respect the byte limit
4993    fn test_row_group_limit_bytes_only() {
4994        let props = WriterProperties::builder()
4995            .set_max_row_group_row_count(None)
4996            // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
4997            .set_max_row_group_bytes(Some(3500))
4998            .build();
4999
5000        let builder = write_batches(
5001            WriteBatchesShape {
5002                num_batches: 10,
5003                rows_per_batch: 10,
5004                row_size: 100,
5005            },
5006            props,
5007        );
5008
5009        let sizes = row_group_sizes(builder.metadata());
5010
5011        assert!(
5012            sizes.len() > 1,
5013            "Should have multiple row groups due to byte limit, got {sizes:?}",
5014        );
5015
5016        let total_rows: i64 = sizes.iter().sum();
5017        assert_eq!(total_rows, 100, "Total rows should be preserved");
5018    }
5019
5020    #[test]
5021    // If an in-progress row group is already oversized, it should be flushed before writing more.
5022    fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5023        let schema = Arc::new(Schema::new(vec![Field::new(
5024            "str",
5025            ArrowDataType::Utf8,
5026            false,
5027        )]));
5028        let file = tempfile::tempfile().unwrap();
5029
5030        // Start with no byte limit so we can intentionally build an oversized in-progress row group.
5031        let props = WriterProperties::builder()
5032            .set_max_row_group_row_count(None)
5033            .set_max_row_group_bytes(None)
5034            .build();
5035        let mut writer =
5036            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5037
5038        let first_array = StringArray::from(
5039            (0..10)
5040                .map(|i| format!("{:0>100}", i))
5041                .collect::<Vec<String>>(),
5042        );
5043        let first_batch =
5044            RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5045        writer.write(&first_batch).unwrap();
5046        assert_eq!(writer.in_progress_rows(), 10);
5047
5048        // Tighten the limit below the current in-progress bytes to exercise:
5049        // `if current_bytes >= max_bytes { self.flush()?; ... }`
5050        writer.max_row_group_bytes = Some(1);
5051
5052        let second_array = StringArray::from(vec!["x".to_string()]);
5053        let second_batch =
5054            RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5055        writer.write(&second_batch).unwrap();
5056        writer.close().unwrap();
5057        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5058
5059        assert_eq!(
5060            &row_group_sizes(builder.metadata()),
5061            &[10, 1],
5062            "The second write should flush an oversized in-progress row group first",
5063        );
5064    }
5065
5066    #[test]
5067    // When both limits are set, the row limit triggers first
5068    fn test_row_group_limit_both_row_wins_single_batch() {
5069        let props = WriterProperties::builder()
5070            .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows
5071            .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data
5072            .build();
5073
5074        let builder = write_batches(
5075            WriteBatchesShape {
5076                num_batches: 1,
5077                row_size: 4,
5078                rows_per_batch: 1000,
5079            },
5080            props,
5081        );
5082
5083        assert_eq!(
5084            &row_group_sizes(builder.metadata()),
5085            &[200, 200, 200, 200, 200],
5086            "Row limit should trigger before byte limit"
5087        );
5088    }
5089
5090    #[test]
5091    // When both limits are set, the row limit triggers first
5092    fn test_row_group_limit_both_row_wins_multiple_batches() {
5093        let props = WriterProperties::builder()
5094            .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows
5095            .set_max_row_group_bytes(Some(9999)) // Won't trigger
5096            .build();
5097
5098        let builder = write_batches(
5099            WriteBatchesShape {
5100                num_batches: 10,
5101                rows_per_batch: 10,
5102                row_size: 100,
5103            },
5104            props,
5105        );
5106
5107        assert_eq!(
5108            &row_group_sizes(builder.metadata()),
5109            &[5; 20],
5110            "Row limit should trigger before byte limit"
5111        );
5112    }
5113
5114    #[test]
5115    // When both limits are set, the byte limit triggers first
5116    fn test_row_group_limit_both_bytes_wins() {
5117        let props = WriterProperties::builder()
5118            .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows
5119            .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows
5120            .build();
5121
5122        let builder = write_batches(
5123            WriteBatchesShape {
5124                num_batches: 10,
5125                rows_per_batch: 10,
5126                row_size: 100,
5127            },
5128            props,
5129        );
5130
5131        let sizes = row_group_sizes(builder.metadata());
5132
5133        assert!(
5134            sizes.len() > 1,
5135            "Byte limit should trigger before row limit, got {sizes:?}",
5136        );
5137
5138        assert!(
5139            sizes.iter().all(|&s| s < 1000),
5140            "No row group should hit the row limit"
5141        );
5142
5143        let total_rows: i64 = sizes.iter().sum();
5144        assert_eq!(total_rows, 100, "Total rows should be preserved");
5145    }
5146
5147    #[test]
5148    fn arrow_column_chunk_close_mut_drops_column_index() {
5149        use crate::arrow::ArrowSchemaConverter;
5150        use crate::file::writer::SerializedFileWriter;
5151
5152        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5153        let props = Arc::new(
5154            WriterProperties::builder()
5155                .set_statistics_enabled(EnabledStatistics::Page)
5156                .build(),
5157        );
5158        let parquet_schema = ArrowSchemaConverter::new()
5159            .with_coerce_types(props.coerce_types())
5160            .convert(&schema)
5161            .unwrap();
5162
5163        let mut buf = Vec::with_capacity(1024);
5164        let mut writer =
5165            SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5166                .unwrap();
5167
5168        let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5169        let mut col_writers = factory.create_column_writers(0).unwrap();
5170        let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5171        for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5172            col_writers[0].write(&leaves).unwrap();
5173        }
5174        let mut chunk = col_writers.pop().unwrap().close().unwrap();
5175
5176        // Immutable accessor exposes the close result produced at close time.
5177        assert!(
5178            chunk.close().column_index.is_some(),
5179            "EnabledStatistics::Page should produce a column_index"
5180        );
5181
5182        // Mutable accessor lets callers drop the page-level index before append.
5183        chunk.close_mut().column_index = None;
5184        assert!(chunk.close().column_index.is_none());
5185
5186        let mut rg = writer.next_row_group().unwrap();
5187        chunk.append_to_row_group(&mut rg).unwrap();
5188        rg.close().unwrap();
5189        let file_meta = writer.close().unwrap();
5190
5191        // After dropping column_index, the resulting file records no column
5192        // index offset/length for this chunk.
5193        let cc = file_meta.row_group(0).column(0);
5194        assert!(cc.column_index_range().is_none());
5195    }
5196}