Skip to main content

parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::iter::Peekable;
25use std::slice::Iter;
26use std::sync::{Arc, Mutex};
27use std::vec::IntoIter;
28
29use arrow_array::cast::AsArray;
30use arrow_array::types::*;
31use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
32use arrow_schema::{
33    ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
34};
35
36use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
37
38use crate::arrow::ArrowSchemaConverter;
39use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::reader::{ChunkReader, Length};
53use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
54use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
55use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
56use levels::{ArrayLevels, calculate_array_levels};
57
58mod byte_array;
59mod levels;
60
61/// Encodes [`RecordBatch`] to parquet
62///
63/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
64/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
65/// flushed on close, leading the final row group in the output file to potentially
66/// contain fewer than `max_row_group_size` rows
67///
68/// # Example: Writing `RecordBatch`es
69/// ```
70/// # use std::sync::Arc;
71/// # use bytes::Bytes;
72/// # use arrow_array::{ArrayRef, Int64Array};
73/// # use arrow_array::RecordBatch;
74/// # use parquet::arrow::arrow_writer::ArrowWriter;
75/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
76/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
77/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
78///
79/// let mut buffer = Vec::new();
80/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
81/// writer.write(&to_write).unwrap();
82/// writer.close().unwrap();
83///
84/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
85/// let read = reader.next().unwrap().unwrap();
86///
87/// assert_eq!(to_write, read);
88/// ```
89///
90/// # Memory Usage and Limiting
91///
92/// The nature of Parquet requires buffering of an entire row group before it can
93/// be flushed to the underlying writer. Data is mostly buffered in its encoded
94/// form, reducing memory usage. However, some data such as dictionary keys,
95/// large strings or very nested data may still result in non-trivial memory
96/// usage.
97///
98/// See Also:
99/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
100/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
101///
102/// Call [`Self::flush`] to trigger an early flush of a row group based on a
103/// memory threshold and/or global memory pressure. However,  smaller row groups
104/// result in higher metadata overheads, and thus may worsen compression ratios
105/// and query performance.
106///
107/// ```no_run
108/// # use std::io::Write;
109/// # use arrow_array::RecordBatch;
110/// # use parquet::arrow::ArrowWriter;
111/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
112/// # let batch: RecordBatch = todo!();
113/// writer.write(&batch).unwrap();
114/// // Trigger an early flush if anticipated size exceeds 1_000_000
115/// if writer.in_progress_size() > 1_000_000 {
116///     writer.flush().unwrap();
117/// }
118/// ```
119///
120/// ## Type Support
121///
122/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
123/// Parquet types including  [`StructArray`] and [`ListArray`].
124///
125/// The following are not supported:
126///
127/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
128///
129/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
130/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
131/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
132/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
133/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
134///
135/// ## Type Compatibility
136/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
137/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
138/// value type.
139///
140/// For example, the following [`DataType`]s are all logically equivalent and can be written
141/// to the same column:
142/// * String, LargeString, StringView
143/// * Binary, LargeBinary, BinaryView
144///
145/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
146/// contain compatible values.
147/// ```
148/// # use std::sync::Arc;
149/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
150/// # use arrow_schema::{DataType, Field, Schema};
151/// # use parquet::arrow::arrow_writer::ArrowWriter;
152/// let record_batch1 = RecordBatch::try_new(
153///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
154///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
155///  )
156/// .unwrap();
157///
158/// let mut buffer = Vec::new();
159/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
160/// writer.write(&record_batch1).unwrap();
161///
162/// let record_batch2 = RecordBatch::try_new(
163///     Arc::new(Schema::new(vec![Field::new(
164///         "col",
165///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
166///          false,
167///     )])),
168///     vec![Arc::new(DictionaryArray::new(
169///          UInt8Array::from_iter_values(vec![0, 1]),
170///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
171///      ))],
172///  )
173///  .unwrap();
174///  writer.write(&record_batch2).unwrap();
175///  writer.close();
176/// ```
177pub struct ArrowWriter<W: Write> {
178    /// Underlying Parquet writer
179    writer: SerializedFileWriter<W>,
180
181    /// The in-progress row group if any
182    in_progress: Option<ArrowRowGroupWriter>,
183
184    /// A copy of the Arrow schema.
185    ///
186    /// The schema is used to verify that each record batch written has the correct schema
187    arrow_schema: SchemaRef,
188
189    /// Creates new [`ArrowRowGroupWriter`] instances as required
190    row_group_writer_factory: ArrowRowGroupWriterFactory,
191
192    /// The maximum number of rows to write to each row group, or None for unlimited
193    max_row_group_row_count: Option<usize>,
194
195    /// The maximum size in bytes for a row group, or None for unlimited
196    max_row_group_bytes: Option<usize>,
197
198    /// CDC chunkers persisted across row groups (one per leaf column).
199    cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
200}
201
202impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        let buffered_memory = self.in_progress_size();
205        f.debug_struct("ArrowWriter")
206            .field("writer", &self.writer)
207            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
208            .field("in_progress_rows", &self.in_progress_rows())
209            .field("arrow_schema", &self.arrow_schema)
210            .field("max_row_group_row_count", &self.max_row_group_row_count)
211            .field("max_row_group_bytes", &self.max_row_group_bytes)
212            .finish()
213    }
214}
215
216impl<W: Write + Send> ArrowWriter<W> {
217    /// Try to create a new Arrow writer
218    ///
219    /// The writer will fail if:
220    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
221    ///  * the Arrow schema contains unsupported datatypes such as Unions
222    pub fn try_new(
223        writer: W,
224        arrow_schema: SchemaRef,
225        props: Option<WriterProperties>,
226    ) -> Result<Self> {
227        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
228        Self::try_new_with_options(writer, arrow_schema, options)
229    }
230
231    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
232    ///
233    /// The writer will fail if:
234    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
235    ///  * the Arrow schema contains unsupported datatypes such as Unions
236    pub fn try_new_with_options(
237        writer: W,
238        arrow_schema: SchemaRef,
239        options: ArrowWriterOptions,
240    ) -> Result<Self> {
241        let mut props = options.properties;
242
243        let schema = if let Some(parquet_schema) = options.schema_descr {
244            parquet_schema.clone()
245        } else {
246            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
247            if let Some(schema_root) = &options.schema_root {
248                converter = converter.schema_root(schema_root);
249            }
250
251            converter.convert(&arrow_schema)?
252        };
253
254        if !options.skip_arrow_metadata {
255            // add serialized arrow schema
256            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
257        }
258
259        let max_row_group_row_count = props.max_row_group_row_count();
260        let max_row_group_bytes = props.max_row_group_bytes();
261
262        let props_ptr = Arc::new(props);
263        let file_writer =
264            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
265
266        let row_group_writer_factory =
267            ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
268
269        let cdc_chunkers = props_ptr
270            .content_defined_chunking()
271            .map(|opts| {
272                file_writer
273                    .schema_descr()
274                    .columns()
275                    .iter()
276                    .map(|desc| ContentDefinedChunker::new(desc, opts))
277                    .collect::<Result<Vec<_>>>()
278            })
279            .transpose()?;
280
281        Ok(Self {
282            writer: file_writer,
283            in_progress: None,
284            arrow_schema,
285            row_group_writer_factory,
286            max_row_group_row_count,
287            max_row_group_bytes,
288            cdc_chunkers,
289        })
290    }
291
292    /// Returns metadata for any flushed row groups
293    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
294        self.writer.flushed_row_groups()
295    }
296
297    /// Estimated memory usage, in bytes, of this `ArrowWriter`
298    ///
299    /// This estimate is formed bu summing the values of
300    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
301    pub fn memory_size(&self) -> usize {
302        match &self.in_progress {
303            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
304            None => 0,
305        }
306    }
307
308    /// Anticipated encoded size of the in progress row group.
309    ///
310    /// This estimate the row group size after being completely encoded is,
311    /// formed by summing the values of
312    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
313    /// columns.
314    pub fn in_progress_size(&self) -> usize {
315        match &self.in_progress {
316            Some(in_progress) => in_progress
317                .writers
318                .iter()
319                .map(|x| x.get_estimated_total_bytes())
320                .sum(),
321            None => 0,
322        }
323    }
324
325    /// Returns the number of rows buffered in the in progress row group
326    pub fn in_progress_rows(&self) -> usize {
327        self.in_progress
328            .as_ref()
329            .map(|x| x.buffered_rows)
330            .unwrap_or_default()
331    }
332
333    /// Returns the number of bytes written by this instance
334    pub fn bytes_written(&self) -> usize {
335        self.writer.bytes_written()
336    }
337
338    /// Encodes the provided [`RecordBatch`]
339    ///
340    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]
341    /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be
342    /// written to one or more row groups such that limits are respected.
343    ///
344    /// If both limits are `None`, all data is written to a single row group.
345    /// If one limit is set, that limit is respected.
346    /// If both limits are set, the lower bound (whichever triggers first) is respected.
347    ///
348    /// This will fail if the `batch`'s schema does not match the writer's schema.
349    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
350        if batch.num_rows() == 0 {
351            return Ok(());
352        }
353
354        let in_progress = match &mut self.in_progress {
355            Some(in_progress) => in_progress,
356            x => x.insert(
357                self.row_group_writer_factory
358                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
359            ),
360        };
361
362        if let Some(max_rows) = self.max_row_group_row_count {
363            if in_progress.buffered_rows + batch.num_rows() > max_rows {
364                let to_write = max_rows - in_progress.buffered_rows;
365                let a = batch.slice(0, to_write);
366                let b = batch.slice(to_write, batch.num_rows() - to_write);
367                self.write(&a)?;
368                return self.write(&b);
369            }
370        }
371
372        // Check byte limit: if we have buffered data, use measured average row size
373        // to split batch proactively before exceeding byte limit
374        if let Some(max_bytes) = self.max_row_group_bytes {
375            if in_progress.buffered_rows > 0 {
376                let current_bytes = in_progress.get_estimated_total_bytes();
377
378                if current_bytes >= max_bytes {
379                    self.flush()?;
380                    return self.write(batch);
381                }
382
383                let avg_row_bytes = current_bytes / in_progress.buffered_rows;
384                if avg_row_bytes > 0 {
385                    // At this point, `current_bytes < max_bytes` (checked above)
386                    let remaining_bytes = max_bytes - current_bytes;
387                    let rows_that_fit = remaining_bytes / avg_row_bytes;
388
389                    if batch.num_rows() > rows_that_fit {
390                        if rows_that_fit > 0 {
391                            let a = batch.slice(0, rows_that_fit);
392                            let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
393                            self.write(&a)?;
394                            return self.write(&b);
395                        } else {
396                            self.flush()?;
397                            return self.write(batch);
398                        }
399                    }
400                }
401            }
402        }
403
404        match self.cdc_chunkers.as_mut() {
405            Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
406            None => in_progress.write(batch)?,
407        }
408
409        let should_flush = self
410            .max_row_group_row_count
411            .is_some_and(|max| in_progress.buffered_rows >= max)
412            || self
413                .max_row_group_bytes
414                .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
415
416        if should_flush {
417            self.flush()?
418        }
419        Ok(())
420    }
421
422    /// Writes the given buf bytes to the internal buffer.
423    ///
424    /// It's safe to use this method to write data to the underlying writer,
425    /// because it will ensure that the buffering and byte‐counting layers are used.
426    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
427        self.writer.write_all(buf)
428    }
429
430    /// Flushes underlying writer
431    pub fn sync(&mut self) -> std::io::Result<()> {
432        self.writer.flush()
433    }
434
435    /// Flushes all buffered rows into a new row group
436    ///
437    /// Note the underlying writer is not flushed with this call.
438    /// If this is a desired behavior, please call [`ArrowWriter::sync`].
439    pub fn flush(&mut self) -> Result<()> {
440        let in_progress = match self.in_progress.take() {
441            Some(in_progress) => in_progress,
442            None => return Ok(()),
443        };
444
445        let mut row_group_writer = self.writer.next_row_group()?;
446        for chunk in in_progress.close()? {
447            chunk.append_to_row_group(&mut row_group_writer)?;
448        }
449        row_group_writer.close()?;
450        Ok(())
451    }
452
453    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
454    ///
455    /// This method provide a way to append kv_metadata after write RecordBatch
456    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
457        self.writer.append_key_value_metadata(kv_metadata)
458    }
459
460    /// Returns a reference to the underlying writer.
461    pub fn inner(&self) -> &W {
462        self.writer.inner()
463    }
464
465    /// Returns a mutable reference to the underlying writer.
466    ///
467    /// **Warning**: if you write directly to this writer, you will skip
468    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
469    /// the file footer’s recorded offsets and sizes to diverge from reality,
470    /// resulting in an unreadable or corrupted Parquet file.
471    ///
472    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
473    pub fn inner_mut(&mut self) -> &mut W {
474        self.writer.inner_mut()
475    }
476
477    /// Flushes any outstanding data and returns the underlying writer.
478    pub fn into_inner(mut self) -> Result<W> {
479        self.flush()?;
480        self.writer.into_inner()
481    }
482
483    /// Close and finalize the underlying Parquet writer
484    ///
485    /// Unlike [`Self::close`] this does not consume self
486    ///
487    /// Attempting to write after calling finish will result in an error
488    pub fn finish(&mut self) -> Result<ParquetMetaData> {
489        self.flush()?;
490        self.writer.finish()
491    }
492
493    /// Close and finalize the underlying Parquet writer
494    pub fn close(mut self) -> Result<ParquetMetaData> {
495        self.finish()
496    }
497
498    /// Create a new row group writer and return its column writers.
499    #[deprecated(
500        since = "56.2.0",
501        note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
502    )]
503    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
504        self.flush()?;
505        let in_progress = self
506            .row_group_writer_factory
507            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
508        Ok(in_progress.writers)
509    }
510
511    /// Append the given column chunks to the file as a new row group.
512    #[deprecated(
513        since = "56.2.0",
514        note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
515    )]
516    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
517        let mut row_group_writer = self.writer.next_row_group()?;
518        for chunk in chunks {
519            chunk.append_to_row_group(&mut row_group_writer)?;
520        }
521        row_group_writer.close()?;
522        Ok(())
523    }
524
525    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
526    ///
527    /// Flushes any outstanding data before returning.
528    ///
529    /// This can be useful to provide more control over how files are written, for example
530    /// to write columns in parallel. See the example on [`ArrowColumnWriter`].
531    pub fn into_serialized_writer(
532        mut self,
533    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
534        self.flush()?;
535        Ok((self.writer, self.row_group_writer_factory))
536    }
537}
538
539impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
540    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
541        self.write(batch).map_err(|e| e.into())
542    }
543
544    fn close(self) -> std::result::Result<(), ArrowError> {
545        self.close()?;
546        Ok(())
547    }
548}
549
550/// Arrow-specific configuration settings for writing parquet files.
551///
552/// See [`ArrowWriter`] for how to configure the writer.
553#[derive(Debug, Clone, Default)]
554pub struct ArrowWriterOptions {
555    properties: WriterProperties,
556    skip_arrow_metadata: bool,
557    schema_root: Option<String>,
558    schema_descr: Option<SchemaDescriptor>,
559}
560
561impl ArrowWriterOptions {
562    /// Creates a new [`ArrowWriterOptions`] with the default settings.
563    pub fn new() -> Self {
564        Self::default()
565    }
566
567    /// Sets the [`WriterProperties`] for writing parquet files.
568    pub fn with_properties(self, properties: WriterProperties) -> Self {
569        Self { properties, ..self }
570    }
571
572    /// Skip encoding the embedded arrow metadata (defaults to `false`)
573    ///
574    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
575    /// by default.
576    ///
577    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
578    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
579        Self {
580            skip_arrow_metadata,
581            ..self
582        }
583    }
584
585    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
586    pub fn with_schema_root(self, schema_root: String) -> Self {
587        Self {
588            schema_root: Some(schema_root),
589            ..self
590        }
591    }
592
593    /// Explicitly specify the Parquet schema to be used
594    ///
595    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
596    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
597    /// already known or must be calculated using custom logic.
598    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
599        Self {
600            schema_descr: Some(schema_descr),
601            ..self
602        }
603    }
604}
605
606/// A single column chunk produced by [`ArrowColumnWriter`]
607#[derive(Default)]
608struct ArrowColumnChunkData {
609    length: usize,
610    data: Vec<Bytes>,
611}
612
613impl Length for ArrowColumnChunkData {
614    fn len(&self) -> u64 {
615        self.length as _
616    }
617}
618
619impl ChunkReader for ArrowColumnChunkData {
620    type T = ArrowColumnChunkReader;
621
622    fn get_read(&self, start: u64) -> Result<Self::T> {
623        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
624        Ok(ArrowColumnChunkReader(
625            self.data.clone().into_iter().peekable(),
626        ))
627    }
628
629    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
630        unimplemented!()
631    }
632}
633
634/// A [`Read`] for [`ArrowColumnChunkData`]
635struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
636
637impl Read for ArrowColumnChunkReader {
638    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
639        let buffer = loop {
640            match self.0.peek_mut() {
641                Some(b) if b.is_empty() => {
642                    self.0.next();
643                    continue;
644                }
645                Some(b) => break b,
646                None => return Ok(0),
647            }
648        };
649
650        let len = buffer.len().min(out.len());
651        let b = buffer.split_to(len);
652        out[..len].copy_from_slice(&b);
653        Ok(len)
654    }
655}
656
657/// A shared [`ArrowColumnChunkData`]
658///
659/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
660/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
661type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
662
663#[derive(Default)]
664struct ArrowPageWriter {
665    buffer: SharedColumnChunk,
666    #[cfg(feature = "encryption")]
667    page_encryptor: Option<PageEncryptor>,
668}
669
670impl ArrowPageWriter {
671    #[cfg(feature = "encryption")]
672    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
673        self.page_encryptor = page_encryptor;
674        self
675    }
676
677    #[cfg(feature = "encryption")]
678    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
679        self.page_encryptor.as_mut()
680    }
681
682    #[cfg(not(feature = "encryption"))]
683    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
684        None
685    }
686}
687
688impl PageWriter for ArrowPageWriter {
689    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
690        let page = match self.page_encryptor_mut() {
691            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
692            None => page,
693        };
694
695        let page_header = page.to_thrift_header()?;
696        let header = {
697            let mut header = Vec::with_capacity(1024);
698
699            match self.page_encryptor_mut() {
700                Some(page_encryptor) => {
701                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
702                    if page.compressed_page().is_data_page() {
703                        page_encryptor.increment_page();
704                    }
705                }
706                None => {
707                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
708                    page_header.write_thrift(&mut protocol)?;
709                }
710            };
711
712            Bytes::from(header)
713        };
714
715        let mut buf = self.buffer.try_lock().unwrap();
716
717        let data = page.compressed_page().buffer().clone();
718        let compressed_size = data.len() + header.len();
719
720        let mut spec = PageWriteSpec::new();
721        spec.page_type = page.page_type();
722        spec.num_values = page.num_values();
723        spec.uncompressed_size = page.uncompressed_size() + header.len();
724        spec.offset = buf.length as u64;
725        spec.compressed_size = compressed_size;
726        spec.bytes_written = compressed_size as u64;
727
728        buf.length += compressed_size;
729        buf.data.push(header);
730        buf.data.push(data);
731
732        Ok(spec)
733    }
734
735    fn close(&mut self) -> Result<()> {
736        Ok(())
737    }
738}
739
740/// A leaf column that can be encoded by [`ArrowColumnWriter`]
741#[derive(Debug)]
742pub struct ArrowLeafColumn(ArrayLevels);
743
744/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
745///
746/// This function can be used along with [`get_column_writers`] to encode
747/// individual columns in parallel. See example on [`ArrowColumnWriter`]
748pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
749    let levels = calculate_array_levels(array, field)?;
750    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
751}
752
753/// The data for a single column chunk, see [`ArrowColumnWriter`]
754pub struct ArrowColumnChunk {
755    data: ArrowColumnChunkData,
756    close: ColumnCloseResult,
757}
758
759impl std::fmt::Debug for ArrowColumnChunk {
760    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761        f.debug_struct("ArrowColumnChunk")
762            .field("length", &self.data.length)
763            .finish_non_exhaustive()
764    }
765}
766
767impl ArrowColumnChunk {
768    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
769    pub fn append_to_row_group<W: Write + Send>(
770        self,
771        writer: &mut SerializedRowGroupWriter<'_, W>,
772    ) -> Result<()> {
773        writer.append_column(&self.data, self.close)
774    }
775}
776
777/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
778///
779/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
780///
781/// Note: This is a low-level interface for applications that require
782/// fine-grained control of encoding (e.g. encoding using multiple threads),
783/// see [`ArrowWriter`] for a higher-level interface
784///
785/// # Example: Encoding two Arrow Array's in Parallel
786/// ```
787/// // The arrow schema
788/// # use std::sync::Arc;
789/// # use arrow_array::*;
790/// # use arrow_schema::*;
791/// # use parquet::arrow::ArrowSchemaConverter;
792/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
793/// # use parquet::file::properties::WriterProperties;
794/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
795/// #
796/// let schema = Arc::new(Schema::new(vec![
797///     Field::new("i32", DataType::Int32, false),
798///     Field::new("f32", DataType::Float32, false),
799/// ]));
800///
801/// // Compute the parquet schema
802/// let props = Arc::new(WriterProperties::default());
803/// let parquet_schema = ArrowSchemaConverter::new()
804///   .with_coerce_types(props.coerce_types())
805///   .convert(&schema)
806///   .unwrap();
807///
808/// // Create parquet writer
809/// let root_schema = parquet_schema.root_schema_ptr();
810/// // write to memory in the example, but this could be a File
811/// let mut out = Vec::with_capacity(1024);
812/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
813///   .unwrap();
814///
815/// // Create a factory for building Arrow column writers
816/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
817/// // Create column writers for the 0th row group
818/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
819///
820/// // Spawn a worker thread for each column
821/// //
822/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
823/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
824/// let mut workers: Vec<_> = col_writers
825///     .into_iter()
826///     .map(|mut col_writer| {
827///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
828///         let handle = std::thread::spawn(move || {
829///             // receive Arrays to encode via the channel
830///             for col in recv {
831///                 col_writer.write(&col)?;
832///             }
833///             // once the input is complete, close the writer
834///             // to return the newly created ArrowColumnChunk
835///             col_writer.close()
836///         });
837///         (handle, send)
838///     })
839///     .collect();
840///
841/// // Start row group
842/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
843///   .next_row_group()
844///   .unwrap();
845///
846/// // Create some example input columns to encode
847/// let to_write = vec![
848///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
849///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
850/// ];
851///
852/// // Send the input columns to the workers
853/// let mut worker_iter = workers.iter_mut();
854/// for (arr, field) in to_write.iter().zip(&schema.fields) {
855///     for leaves in compute_leaves(field, arr).unwrap() {
856///         worker_iter.next().unwrap().1.send(leaves).unwrap();
857///     }
858/// }
859///
860/// // Wait for the workers to complete encoding, and append
861/// // the resulting column chunks to the row group (and the file)
862/// for (handle, send) in workers {
863///     drop(send); // Drop send side to signal termination
864///     // wait for the worker to send the completed chunk
865///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
866///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
867/// }
868/// // Close the row group which writes to the underlying file
869/// row_group_writer.close().unwrap();
870///
871/// let metadata = writer.close().unwrap();
872/// assert_eq!(metadata.file_metadata().num_rows(), 3);
873/// ```
874pub struct ArrowColumnWriter {
875    writer: ArrowColumnWriterImpl,
876    chunk: SharedColumnChunk,
877}
878
879impl std::fmt::Debug for ArrowColumnWriter {
880    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
881        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
882    }
883}
884
885enum ArrowColumnWriterImpl {
886    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
887    Column(ColumnWriter<'static>),
888}
889
890impl ArrowColumnWriter {
891    /// Write an [`ArrowLeafColumn`]
892    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
893        self.write_internal(&col.0)
894    }
895
896    /// Write with content-defined chunking, inserting page flushes at chunk boundaries.
897    fn write_with_chunker(
898        &mut self,
899        col: &ArrowLeafColumn,
900        chunker: &mut ContentDefinedChunker,
901    ) -> Result<()> {
902        let levels = &col.0;
903        let chunks =
904            chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;
905
906        let num_chunks = chunks.len();
907        for (i, chunk) in chunks.iter().enumerate() {
908            let chunk_levels = levels.slice_for_chunk(chunk);
909            self.write_internal(&chunk_levels)?;
910
911            // Add a page break after each chunk except the last
912            if i + 1 < num_chunks {
913                match &mut self.writer {
914                    ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
915                    ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
916                }
917            }
918        }
919        Ok(())
920    }
921
922    fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
923        match &mut self.writer {
924            ArrowColumnWriterImpl::Column(c) => {
925                let leaf = levels.array();
926                match leaf.as_any_dictionary_opt() {
927                    Some(dictionary) => {
928                        let materialized =
929                            arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
930                        write_leaf(c, &materialized, levels)?
931                    }
932                    None => write_leaf(c, leaf, levels)?,
933                };
934            }
935            ArrowColumnWriterImpl::ByteArray(c) => {
936                write_primitive(c, levels.array().as_ref(), levels)?;
937            }
938        }
939        Ok(())
940    }
941
942    /// Close this column returning the written [`ArrowColumnChunk`]
943    pub fn close(self) -> Result<ArrowColumnChunk> {
944        let close = match self.writer {
945            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
946            ArrowColumnWriterImpl::Column(c) => c.close()?,
947        };
948        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
949        let data = chunk.into_inner().unwrap();
950        Ok(ArrowColumnChunk { data, close })
951    }
952
953    /// Returns the estimated total memory usage by the writer.
954    ///
955    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
956    /// of the current memory usage and not it's anticipated encoded size.
957    ///
958    /// This includes:
959    /// 1. Data buffered in encoded form
960    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
961    ///
962    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
963    pub fn memory_size(&self) -> usize {
964        match &self.writer {
965            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
966            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
967        }
968    }
969
970    /// Returns the estimated total encoded bytes for this column writer.
971    ///
972    /// This includes:
973    /// 1. Data buffered in encoded form
974    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
975    ///
976    /// This value should be less than or equal to [`Self::memory_size`]
977    pub fn get_estimated_total_bytes(&self) -> usize {
978        match &self.writer {
979            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
980            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
981        }
982    }
983}
984
985/// Encodes [`RecordBatch`] to a parquet row group
986///
987/// Note: this structure is created by [`ArrowRowGroupWriterFactory`] internally used to
988/// create [`ArrowRowGroupWriter`]s, but it is not exposed publicly.
989///
990/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
991#[derive(Debug)]
992struct ArrowRowGroupWriter {
993    writers: Vec<ArrowColumnWriter>,
994    schema: SchemaRef,
995    buffered_rows: usize,
996}
997
998impl ArrowRowGroupWriter {
999    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1000        Self {
1001            writers,
1002            schema: arrow.clone(),
1003            buffered_rows: 0,
1004        }
1005    }
1006
1007    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1008        self.buffered_rows += batch.num_rows();
1009        let mut writers = self.writers.iter_mut();
1010        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1011            for leaf in compute_leaves(field.as_ref(), column)? {
1012                writers.next().unwrap().write(&leaf)?;
1013            }
1014        }
1015        Ok(())
1016    }
1017
1018    fn write_with_chunkers(
1019        &mut self,
1020        batch: &RecordBatch,
1021        chunkers: &mut [ContentDefinedChunker],
1022    ) -> Result<()> {
1023        self.buffered_rows += batch.num_rows();
1024        let mut writers = self.writers.iter_mut();
1025        let mut chunkers = chunkers.iter_mut();
1026        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1027            for leaf in compute_leaves(field.as_ref(), column)? {
1028                writers
1029                    .next()
1030                    .unwrap()
1031                    .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1032            }
1033        }
1034        Ok(())
1035    }
1036
1037    /// Returns the estimated total encoded bytes for this row group
1038    fn get_estimated_total_bytes(&self) -> usize {
1039        self.writers
1040            .iter()
1041            .map(|x| x.get_estimated_total_bytes())
1042            .sum()
1043    }
1044
1045    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1046        self.writers
1047            .into_iter()
1048            .map(|writer| writer.close())
1049            .collect()
1050    }
1051}
1052
1053/// Factory that creates new column writers for each row group in the Parquet file.
1054///
1055/// You can create this structure via an [`ArrowWriter::into_serialized_writer`].
1056/// See the example on [`ArrowColumnWriter`] for how to encode columns in parallel
1057#[derive(Debug)]
1058pub struct ArrowRowGroupWriterFactory {
1059    schema: SchemaDescPtr,
1060    arrow_schema: SchemaRef,
1061    props: WriterPropertiesPtr,
1062    #[cfg(feature = "encryption")]
1063    file_encryptor: Option<Arc<FileEncryptor>>,
1064}
1065
1066impl ArrowRowGroupWriterFactory {
1067    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
1068    pub fn new<W: Write + Send>(
1069        file_writer: &SerializedFileWriter<W>,
1070        arrow_schema: SchemaRef,
1071    ) -> Self {
1072        let schema = Arc::clone(file_writer.schema_descr_ptr());
1073        let props = Arc::clone(file_writer.properties());
1074        Self {
1075            schema,
1076            arrow_schema,
1077            props,
1078            #[cfg(feature = "encryption")]
1079            file_encryptor: file_writer.file_encryptor(),
1080        }
1081    }
1082
1083    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1084        let writers = self.create_column_writers(row_group_index)?;
1085        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1086    }
1087
1088    /// Create column writers for a new row group, with the given row group index
1089    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1090        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1091        let mut leaves = self.schema.columns().iter();
1092        let column_factory = self.column_writer_factory(row_group_index);
1093        for field in &self.arrow_schema.fields {
1094            column_factory.get_arrow_column_writer(
1095                field.data_type(),
1096                &self.props,
1097                &mut leaves,
1098                &mut writers,
1099            )?;
1100        }
1101        Ok(writers)
1102    }
1103
1104    #[cfg(feature = "encryption")]
1105    fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1106        ArrowColumnWriterFactory::new()
1107            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1108    }
1109
1110    #[cfg(not(feature = "encryption"))]
1111    fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1112        ArrowColumnWriterFactory::new()
1113    }
1114}
1115
1116/// Returns [`ArrowColumnWriter`]s for each column in a given schema
1117#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1118pub fn get_column_writers(
1119    parquet: &SchemaDescriptor,
1120    props: &WriterPropertiesPtr,
1121    arrow: &SchemaRef,
1122) -> Result<Vec<ArrowColumnWriter>> {
1123    let mut writers = Vec::with_capacity(arrow.fields.len());
1124    let mut leaves = parquet.columns().iter();
1125    let column_factory = ArrowColumnWriterFactory::new();
1126    for field in &arrow.fields {
1127        column_factory.get_arrow_column_writer(
1128            field.data_type(),
1129            props,
1130            &mut leaves,
1131            &mut writers,
1132        )?;
1133    }
1134    Ok(writers)
1135}
1136
1137/// Creates [`ArrowColumnWriter`] instances
1138struct ArrowColumnWriterFactory {
1139    #[cfg(feature = "encryption")]
1140    row_group_index: usize,
1141    #[cfg(feature = "encryption")]
1142    file_encryptor: Option<Arc<FileEncryptor>>,
1143}
1144
1145impl ArrowColumnWriterFactory {
1146    pub fn new() -> Self {
1147        Self {
1148            #[cfg(feature = "encryption")]
1149            row_group_index: 0,
1150            #[cfg(feature = "encryption")]
1151            file_encryptor: None,
1152        }
1153    }
1154
1155    #[cfg(feature = "encryption")]
1156    pub fn with_file_encryptor(
1157        mut self,
1158        row_group_index: usize,
1159        file_encryptor: Option<Arc<FileEncryptor>>,
1160    ) -> Self {
1161        self.row_group_index = row_group_index;
1162        self.file_encryptor = file_encryptor;
1163        self
1164    }
1165
1166    #[cfg(feature = "encryption")]
1167    fn create_page_writer(
1168        &self,
1169        column_descriptor: &ColumnDescPtr,
1170        column_index: usize,
1171    ) -> Result<Box<ArrowPageWriter>> {
1172        let column_path = column_descriptor.path().string();
1173        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1174            &self.file_encryptor,
1175            self.row_group_index,
1176            column_index,
1177            &column_path,
1178        )?;
1179        Ok(Box::new(
1180            ArrowPageWriter::default().with_encryptor(page_encryptor),
1181        ))
1182    }
1183
1184    #[cfg(not(feature = "encryption"))]
1185    fn create_page_writer(
1186        &self,
1187        _column_descriptor: &ColumnDescPtr,
1188        _column_index: usize,
1189    ) -> Result<Box<ArrowPageWriter>> {
1190        Ok(Box::<ArrowPageWriter>::default())
1191    }
1192
1193    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1194    /// output ColumnDesc to `leaves` and the column writers to `out`
1195    fn get_arrow_column_writer(
1196        &self,
1197        data_type: &ArrowDataType,
1198        props: &WriterPropertiesPtr,
1199        leaves: &mut Iter<'_, ColumnDescPtr>,
1200        out: &mut Vec<ArrowColumnWriter>,
1201    ) -> Result<()> {
1202        // Instantiate writers for normal columns
1203        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1204            let page_writer = self.create_page_writer(desc, out.len())?;
1205            let chunk = page_writer.buffer.clone();
1206            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1207            Ok(ArrowColumnWriter {
1208                chunk,
1209                writer: ArrowColumnWriterImpl::Column(writer),
1210            })
1211        };
1212
1213        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1214        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1215            let page_writer = self.create_page_writer(desc, out.len())?;
1216            let chunk = page_writer.buffer.clone();
1217            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1218            Ok(ArrowColumnWriter {
1219                chunk,
1220                writer: ArrowColumnWriterImpl::ByteArray(writer),
1221            })
1222        };
1223
1224        match data_type {
1225            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1226            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1227                out.push(col(leaves.next().unwrap())?)
1228            }
1229            ArrowDataType::LargeBinary
1230            | ArrowDataType::Binary
1231            | ArrowDataType::Utf8
1232            | ArrowDataType::LargeUtf8
1233            | ArrowDataType::BinaryView
1234            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1235            ArrowDataType::List(f)
1236            | ArrowDataType::LargeList(f)
1237            | ArrowDataType::FixedSizeList(f, _)
1238            | ArrowDataType::ListView(f)
1239            | ArrowDataType::LargeListView(f) => {
1240                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1241            }
1242            ArrowDataType::Struct(fields) => {
1243                for field in fields {
1244                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1245                }
1246            }
1247            ArrowDataType::Map(f, _) => match f.data_type() {
1248                ArrowDataType::Struct(f) => {
1249                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1250                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1251                }
1252                _ => unreachable!("invalid map type"),
1253            },
1254            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1255                ArrowDataType::Utf8
1256                | ArrowDataType::LargeUtf8
1257                | ArrowDataType::Binary
1258                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1259                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1260                    out.push(bytes(leaves.next().unwrap())?)
1261                }
1262                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1263                _ => out.push(col(leaves.next().unwrap())?),
1264            },
1265            _ => {
1266                return Err(ParquetError::NYI(format!(
1267                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1268                )));
1269            }
1270        }
1271        Ok(())
1272    }
1273}
1274
1275fn write_leaf(
1276    writer: &mut ColumnWriter<'_>,
1277    column: &dyn arrow_array::Array,
1278    levels: &ArrayLevels,
1279) -> Result<usize> {
1280    let indices = levels.non_null_indices();
1281
1282    match writer {
1283        // Note: this should match the contents of arrow_to_parquet_type
1284        ColumnWriter::Int32ColumnWriter(typed) => {
1285            match column.data_type() {
1286                ArrowDataType::Null => {
1287                    let array = Int32Array::new_null(column.len());
1288                    write_primitive(typed, array.values(), levels)
1289                }
1290                ArrowDataType::Int8 => {
1291                    let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1292                    write_primitive(typed, array.values(), levels)
1293                }
1294                ArrowDataType::Int16 => {
1295                    let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1296                    write_primitive(typed, array.values(), levels)
1297                }
1298                ArrowDataType::Int32 => {
1299                    write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1300                }
1301                ArrowDataType::UInt8 => {
1302                    let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1303                    write_primitive(typed, array.values(), levels)
1304                }
1305                ArrowDataType::UInt16 => {
1306                    let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1307                    write_primitive(typed, array.values(), levels)
1308                }
1309                ArrowDataType::UInt32 => {
1310                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1311                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1312                    let array = column.as_primitive::<UInt32Type>();
1313                    write_primitive(typed, array.values().inner().typed_data(), levels)
1314                }
1315                ArrowDataType::Date32 => {
1316                    let array = column.as_primitive::<Date32Type>();
1317                    write_primitive(typed, array.values(), levels)
1318                }
1319                ArrowDataType::Time32(TimeUnit::Second) => {
1320                    let array = column.as_primitive::<Time32SecondType>();
1321                    write_primitive(typed, array.values(), levels)
1322                }
1323                ArrowDataType::Time32(TimeUnit::Millisecond) => {
1324                    let array = column.as_primitive::<Time32MillisecondType>();
1325                    write_primitive(typed, array.values(), levels)
1326                }
1327                ArrowDataType::Date64 => {
1328                    // If the column is a Date64, we truncate it
1329                    let array: Int32Array = column
1330                        .as_primitive::<Date64Type>()
1331                        .unary(|x| (x / 86_400_000) as _);
1332
1333                    write_primitive(typed, array.values(), levels)
1334                }
1335                ArrowDataType::Decimal32(_, _) => {
1336                    let array = column
1337                        .as_primitive::<Decimal32Type>()
1338                        .unary::<_, Int32Type>(|v| v);
1339                    write_primitive(typed, array.values(), levels)
1340                }
1341                ArrowDataType::Decimal64(_, _) => {
1342                    // use the int32 to represent the decimal with low precision
1343                    let array = column
1344                        .as_primitive::<Decimal64Type>()
1345                        .unary::<_, Int32Type>(|v| v as i32);
1346                    write_primitive(typed, array.values(), levels)
1347                }
1348                ArrowDataType::Decimal128(_, _) => {
1349                    // use the int32 to represent the decimal with low precision
1350                    let array = column
1351                        .as_primitive::<Decimal128Type>()
1352                        .unary::<_, Int32Type>(|v| v as i32);
1353                    write_primitive(typed, array.values(), levels)
1354                }
1355                ArrowDataType::Decimal256(_, _) => {
1356                    // use the int32 to represent the decimal with low precision
1357                    let array = column
1358                        .as_primitive::<Decimal256Type>()
1359                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1360                    write_primitive(typed, array.values(), levels)
1361                }
1362                d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1363            }
1364        }
1365        ColumnWriter::BoolColumnWriter(typed) => {
1366            let array = column.as_boolean();
1367            typed.write_batch(
1368                get_bool_array_slice(array, indices).as_slice(),
1369                levels.def_levels(),
1370                levels.rep_levels(),
1371            )
1372        }
1373        ColumnWriter::Int64ColumnWriter(typed) => {
1374            match column.data_type() {
1375                ArrowDataType::Date64 => {
1376                    let array = column
1377                        .as_primitive::<Date64Type>()
1378                        .reinterpret_cast::<Int64Type>();
1379
1380                    write_primitive(typed, array.values(), levels)
1381                }
1382                ArrowDataType::Int64 => {
1383                    let array = column.as_primitive::<Int64Type>();
1384                    write_primitive(typed, array.values(), levels)
1385                }
1386                ArrowDataType::UInt64 => {
1387                    let values = column.as_primitive::<UInt64Type>().values();
1388                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1389                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1390                    let array = values.inner().typed_data::<i64>();
1391                    write_primitive(typed, array, levels)
1392                }
1393                ArrowDataType::Time64(TimeUnit::Microsecond) => {
1394                    let array = column.as_primitive::<Time64MicrosecondType>();
1395                    write_primitive(typed, array.values(), levels)
1396                }
1397                ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1398                    let array = column.as_primitive::<Time64NanosecondType>();
1399                    write_primitive(typed, array.values(), levels)
1400                }
1401                ArrowDataType::Timestamp(unit, _) => match unit {
1402                    TimeUnit::Second => {
1403                        let array = column.as_primitive::<TimestampSecondType>();
1404                        write_primitive(typed, array.values(), levels)
1405                    }
1406                    TimeUnit::Millisecond => {
1407                        let array = column.as_primitive::<TimestampMillisecondType>();
1408                        write_primitive(typed, array.values(), levels)
1409                    }
1410                    TimeUnit::Microsecond => {
1411                        let array = column.as_primitive::<TimestampMicrosecondType>();
1412                        write_primitive(typed, array.values(), levels)
1413                    }
1414                    TimeUnit::Nanosecond => {
1415                        let array = column.as_primitive::<TimestampNanosecondType>();
1416                        write_primitive(typed, array.values(), levels)
1417                    }
1418                },
1419                ArrowDataType::Duration(unit) => match unit {
1420                    TimeUnit::Second => {
1421                        let array = column.as_primitive::<DurationSecondType>();
1422                        write_primitive(typed, array.values(), levels)
1423                    }
1424                    TimeUnit::Millisecond => {
1425                        let array = column.as_primitive::<DurationMillisecondType>();
1426                        write_primitive(typed, array.values(), levels)
1427                    }
1428                    TimeUnit::Microsecond => {
1429                        let array = column.as_primitive::<DurationMicrosecondType>();
1430                        write_primitive(typed, array.values(), levels)
1431                    }
1432                    TimeUnit::Nanosecond => {
1433                        let array = column.as_primitive::<DurationNanosecondType>();
1434                        write_primitive(typed, array.values(), levels)
1435                    }
1436                },
1437                ArrowDataType::Decimal64(_, _) => {
1438                    let array = column
1439                        .as_primitive::<Decimal64Type>()
1440                        .reinterpret_cast::<Int64Type>();
1441                    write_primitive(typed, array.values(), levels)
1442                }
1443                ArrowDataType::Decimal128(_, _) => {
1444                    // use the int64 to represent the decimal with low precision
1445                    let array = column
1446                        .as_primitive::<Decimal128Type>()
1447                        .unary::<_, Int64Type>(|v| v as i64);
1448                    write_primitive(typed, array.values(), levels)
1449                }
1450                ArrowDataType::Decimal256(_, _) => {
1451                    // use the int64 to represent the decimal with low precision
1452                    let array = column
1453                        .as_primitive::<Decimal256Type>()
1454                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1455                    write_primitive(typed, array.values(), levels)
1456                }
1457                d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1458            }
1459        }
1460        ColumnWriter::Int96ColumnWriter(_typed) => {
1461            unreachable!("Currently unreachable because data type not supported")
1462        }
1463        ColumnWriter::FloatColumnWriter(typed) => {
1464            let array = column.as_primitive::<Float32Type>();
1465            write_primitive(typed, array.values(), levels)
1466        }
1467        ColumnWriter::DoubleColumnWriter(typed) => {
1468            let array = column.as_primitive::<Float64Type>();
1469            write_primitive(typed, array.values(), levels)
1470        }
1471        ColumnWriter::ByteArrayColumnWriter(_) => {
1472            unreachable!("should use ByteArrayWriter")
1473        }
1474        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1475            let bytes = match column.data_type() {
1476                ArrowDataType::Interval(interval_unit) => match interval_unit {
1477                    IntervalUnit::YearMonth => {
1478                        let array = column.as_primitive::<IntervalYearMonthType>();
1479                        get_interval_ym_array_slice(array, indices)
1480                    }
1481                    IntervalUnit::DayTime => {
1482                        let array = column.as_primitive::<IntervalDayTimeType>();
1483                        get_interval_dt_array_slice(array, indices)
1484                    }
1485                    _ => {
1486                        return Err(ParquetError::NYI(format!(
1487                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1488                        )));
1489                    }
1490                },
1491                ArrowDataType::FixedSizeBinary(_) => {
1492                    let array = column.as_fixed_size_binary();
1493                    get_fsb_array_slice(array, indices)
1494                }
1495                ArrowDataType::Decimal32(_, _) => {
1496                    let array = column.as_primitive::<Decimal32Type>();
1497                    get_decimal_32_array_slice(array, indices)
1498                }
1499                ArrowDataType::Decimal64(_, _) => {
1500                    let array = column.as_primitive::<Decimal64Type>();
1501                    get_decimal_64_array_slice(array, indices)
1502                }
1503                ArrowDataType::Decimal128(_, _) => {
1504                    let array = column.as_primitive::<Decimal128Type>();
1505                    get_decimal_128_array_slice(array, indices)
1506                }
1507                ArrowDataType::Decimal256(_, _) => {
1508                    let array = column.as_primitive::<Decimal256Type>();
1509                    get_decimal_256_array_slice(array, indices)
1510                }
1511                ArrowDataType::Float16 => {
1512                    let array = column.as_primitive::<Float16Type>();
1513                    get_float_16_array_slice(array, indices)
1514                }
1515                _ => {
1516                    return Err(ParquetError::NYI(
1517                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1518                    ));
1519                }
1520            };
1521            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1522        }
1523    }
1524}
1525
1526fn write_primitive<E: ColumnValueEncoder>(
1527    writer: &mut GenericColumnWriter<E>,
1528    values: &E::Values,
1529    levels: &ArrayLevels,
1530) -> Result<usize> {
1531    writer.write_batch_internal(
1532        values,
1533        Some(levels.non_null_indices()),
1534        levels.def_levels(),
1535        levels.rep_levels(),
1536        None,
1537        None,
1538        None,
1539    )
1540}
1541
1542fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1543    let mut values = Vec::with_capacity(indices.len());
1544    for i in indices {
1545        values.push(array.value(*i))
1546    }
1547    values
1548}
1549
1550/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1551/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1552fn get_interval_ym_array_slice(
1553    array: &arrow_array::IntervalYearMonthArray,
1554    indices: &[usize],
1555) -> Vec<FixedLenByteArray> {
1556    let mut values = Vec::with_capacity(indices.len());
1557    for i in indices {
1558        let mut value = array.value(*i).to_le_bytes().to_vec();
1559        let mut suffix = vec![0; 8];
1560        value.append(&mut suffix);
1561        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1562    }
1563    values
1564}
1565
1566/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1567/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1568fn get_interval_dt_array_slice(
1569    array: &arrow_array::IntervalDayTimeArray,
1570    indices: &[usize],
1571) -> Vec<FixedLenByteArray> {
1572    let mut values = Vec::with_capacity(indices.len());
1573    for i in indices {
1574        let mut out = [0; 12];
1575        let value = array.value(*i);
1576        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1577        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1578        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1579    }
1580    values
1581}
1582
1583fn get_decimal_32_array_slice(
1584    array: &arrow_array::Decimal32Array,
1585    indices: &[usize],
1586) -> Vec<FixedLenByteArray> {
1587    let mut values = Vec::with_capacity(indices.len());
1588    let size = decimal_length_from_precision(array.precision());
1589    for i in indices {
1590        let as_be_bytes = array.value(*i).to_be_bytes();
1591        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1592        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1593    }
1594    values
1595}
1596
1597fn get_decimal_64_array_slice(
1598    array: &arrow_array::Decimal64Array,
1599    indices: &[usize],
1600) -> Vec<FixedLenByteArray> {
1601    let mut values = Vec::with_capacity(indices.len());
1602    let size = decimal_length_from_precision(array.precision());
1603    for i in indices {
1604        let as_be_bytes = array.value(*i).to_be_bytes();
1605        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1606        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1607    }
1608    values
1609}
1610
1611fn get_decimal_128_array_slice(
1612    array: &arrow_array::Decimal128Array,
1613    indices: &[usize],
1614) -> Vec<FixedLenByteArray> {
1615    let mut values = Vec::with_capacity(indices.len());
1616    let size = decimal_length_from_precision(array.precision());
1617    for i in indices {
1618        let as_be_bytes = array.value(*i).to_be_bytes();
1619        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1620        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1621    }
1622    values
1623}
1624
1625fn get_decimal_256_array_slice(
1626    array: &arrow_array::Decimal256Array,
1627    indices: &[usize],
1628) -> Vec<FixedLenByteArray> {
1629    let mut values = Vec::with_capacity(indices.len());
1630    let size = decimal_length_from_precision(array.precision());
1631    for i in indices {
1632        let as_be_bytes = array.value(*i).to_be_bytes();
1633        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1634        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1635    }
1636    values
1637}
1638
1639fn get_float_16_array_slice(
1640    array: &arrow_array::Float16Array,
1641    indices: &[usize],
1642) -> Vec<FixedLenByteArray> {
1643    let mut values = Vec::with_capacity(indices.len());
1644    for i in indices {
1645        let value = array.value(*i).to_le_bytes().to_vec();
1646        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1647    }
1648    values
1649}
1650
1651fn get_fsb_array_slice(
1652    array: &arrow_array::FixedSizeBinaryArray,
1653    indices: &[usize],
1654) -> Vec<FixedLenByteArray> {
1655    let mut values = Vec::with_capacity(indices.len());
1656    for i in indices {
1657        let value = array.value(*i).to_vec();
1658        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1659    }
1660    values
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665    use super::*;
1666    use std::collections::HashMap;
1667
1668    use std::fs::File;
1669
1670    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1671    use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1672    use crate::column::page::{Page, PageReader};
1673    use crate::file::metadata::thrift::PageHeader;
1674    use crate::file::page_index::column_index::ColumnIndexMetaData;
1675    use crate::file::reader::SerializedPageReader;
1676    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1677    use crate::schema::types::ColumnPath;
1678    use arrow::datatypes::ToByteSlice;
1679    use arrow::datatypes::{DataType, Schema};
1680    use arrow::error::Result as ArrowResult;
1681    use arrow::util::data_gen::create_random_array;
1682    use arrow::util::pretty::pretty_format_batches;
1683    use arrow::{array::*, buffer::Buffer};
1684    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1685    use arrow_schema::Fields;
1686    use half::f16;
1687    use num_traits::{FromPrimitive, ToPrimitive};
1688    use tempfile::tempfile;
1689
1690    use crate::basic::Encoding;
1691    use crate::data_type::AsBytes;
1692    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1693    use crate::file::properties::{
1694        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1695    };
1696    use crate::file::serialized_reader::ReadOptionsBuilder;
1697    use crate::file::{
1698        reader::{FileReader, SerializedFileReader},
1699        statistics::Statistics,
1700    };
1701
1702    #[test]
1703    fn arrow_writer() {
1704        // define schema
1705        let schema = Schema::new(vec![
1706            Field::new("a", DataType::Int32, false),
1707            Field::new("b", DataType::Int32, true),
1708        ]);
1709
1710        // create some data
1711        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1712        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1713
1714        // build a record batch
1715        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1716
1717        roundtrip(batch, Some(SMALL_SIZE / 2));
1718    }
1719
1720    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1721        let mut buffer = vec![];
1722
1723        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1724        writer.write(expected_batch).unwrap();
1725        writer.close().unwrap();
1726
1727        buffer
1728    }
1729
1730    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1731        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1732        writer.write(expected_batch).unwrap();
1733        writer.into_inner().unwrap()
1734    }
1735
1736    #[test]
1737    fn roundtrip_bytes() {
1738        // define schema
1739        let schema = Arc::new(Schema::new(vec![
1740            Field::new("a", DataType::Int32, false),
1741            Field::new("b", DataType::Int32, true),
1742        ]));
1743
1744        // create some data
1745        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1746        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1747
1748        // build a record batch
1749        let expected_batch =
1750            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1751
1752        for buffer in [
1753            get_bytes_after_close(schema.clone(), &expected_batch),
1754            get_bytes_by_into_inner(schema, &expected_batch),
1755        ] {
1756            let cursor = Bytes::from(buffer);
1757            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1758
1759            let actual_batch = record_batch_reader
1760                .next()
1761                .expect("No batch found")
1762                .expect("Unable to get batch");
1763
1764            assert_eq!(expected_batch.schema(), actual_batch.schema());
1765            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1766            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1767            for i in 0..expected_batch.num_columns() {
1768                let expected_data = expected_batch.column(i).to_data();
1769                let actual_data = actual_batch.column(i).to_data();
1770
1771                assert_eq!(expected_data, actual_data);
1772            }
1773        }
1774    }
1775
1776    #[test]
1777    fn arrow_writer_non_null() {
1778        // define schema
1779        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1780
1781        // create some data
1782        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1783
1784        // build a record batch
1785        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1786
1787        roundtrip(batch, Some(SMALL_SIZE / 2));
1788    }
1789
1790    #[test]
1791    fn arrow_writer_list() {
1792        // define schema
1793        let schema = Schema::new(vec![Field::new(
1794            "a",
1795            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1796            true,
1797        )]);
1798
1799        // create some data
1800        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1801
1802        // Construct a buffer for value offsets, for the nested array:
1803        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1804        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1805
1806        // Construct a list array from the above two
1807        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1808            DataType::Int32,
1809            false,
1810        ))))
1811        .len(5)
1812        .add_buffer(a_value_offsets)
1813        .add_child_data(a_values.into_data())
1814        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1815        .build()
1816        .unwrap();
1817        let a = ListArray::from(a_list_data);
1818
1819        // build a record batch
1820        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1821
1822        assert_eq!(batch.column(0).null_count(), 1);
1823
1824        // This test fails if the max row group size is less than the batch's length
1825        // see https://github.com/apache/arrow-rs/issues/518
1826        roundtrip(batch, None);
1827    }
1828
1829    #[test]
1830    fn arrow_writer_list_non_null() {
1831        // define schema
1832        let schema = Schema::new(vec![Field::new(
1833            "a",
1834            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1835            false,
1836        )]);
1837
1838        // create some data
1839        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1840
1841        // Construct a buffer for value offsets, for the nested array:
1842        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1843        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1844
1845        // Construct a list array from the above two
1846        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1847            DataType::Int32,
1848            false,
1849        ))))
1850        .len(5)
1851        .add_buffer(a_value_offsets)
1852        .add_child_data(a_values.into_data())
1853        .build()
1854        .unwrap();
1855        let a = ListArray::from(a_list_data);
1856
1857        // build a record batch
1858        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1859
1860        // This test fails if the max row group size is less than the batch's length
1861        // see https://github.com/apache/arrow-rs/issues/518
1862        assert_eq!(batch.column(0).null_count(), 0);
1863
1864        roundtrip(batch, None);
1865    }
1866
1867    #[test]
1868    fn arrow_writer_list_view() {
1869        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1870        let schema = Schema::new(vec![Field::new(
1871            "a",
1872            DataType::ListView(list_field.clone()),
1873            true,
1874        )]);
1875
1876        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1877        let a = ListViewArray::new(
1878            list_field,
1879            vec![0, 1, 0, 3, 6].into(),
1880            vec![1, 2, 0, 3, 4].into(),
1881            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1882            Some(vec![true, true, false, true, true].into()),
1883        );
1884
1885        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1886
1887        assert_eq!(batch.column(0).null_count(), 1);
1888
1889        roundtrip(batch, None);
1890    }
1891
1892    #[test]
1893    fn arrow_writer_list_view_non_null() {
1894        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1895        let schema = Schema::new(vec![Field::new(
1896            "a",
1897            DataType::ListView(list_field.clone()),
1898            false,
1899        )]);
1900
1901        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1902        let a = ListViewArray::new(
1903            list_field,
1904            vec![0, 1, 0, 3, 6].into(),
1905            vec![1, 2, 0, 3, 4].into(),
1906            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1907            None,
1908        );
1909
1910        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1911
1912        assert_eq!(batch.column(0).null_count(), 0);
1913
1914        roundtrip(batch, None);
1915    }
1916
1917    #[test]
1918    fn arrow_writer_list_view_out_of_order() {
1919        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1920        let schema = Schema::new(vec![Field::new(
1921            "a",
1922            DataType::ListView(list_field.clone()),
1923            false,
1924        )]);
1925
1926        // [[1], [2, 3], [], [7, 8, 9, 10], [4, 5, 6]] - out of order offsets
1927        let a = ListViewArray::new(
1928            list_field,
1929            vec![0, 1, 0, 6, 3].into(),
1930            vec![1, 2, 0, 4, 3].into(),
1931            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1932            None,
1933        );
1934
1935        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1936
1937        roundtrip(batch, None);
1938    }
1939
1940    #[test]
1941    fn arrow_writer_large_list_view() {
1942        let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1943        let schema = Schema::new(vec![Field::new(
1944            "a",
1945            DataType::LargeListView(list_field.clone()),
1946            true,
1947        )]);
1948
1949        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1950        let a = LargeListViewArray::new(
1951            list_field,
1952            vec![0i64, 1, 0, 3, 6].into(),
1953            vec![1i64, 2, 0, 3, 4].into(),
1954            Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1955            Some(vec![true, true, false, true, true].into()),
1956        );
1957
1958        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1959
1960        assert_eq!(batch.column(0).null_count(), 1);
1961
1962        roundtrip(batch, None);
1963    }
1964
1965    #[test]
1966    fn arrow_writer_list_view_with_struct() {
1967        // Test ListView containing Struct: ListView<Struct<Int32, Utf8>>
1968        let struct_fields = Fields::from(vec![
1969            Field::new("id", DataType::Int32, false),
1970            Field::new("name", DataType::Utf8, false),
1971        ]);
1972        let struct_type = DataType::Struct(struct_fields.clone());
1973        let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
1974
1975        let schema = Schema::new(vec![Field::new(
1976            "a",
1977            DataType::ListView(list_field.clone()),
1978            true,
1979        )]);
1980
1981        // Create struct values
1982        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
1983        let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
1984        let struct_array = StructArray::new(
1985            struct_fields,
1986            vec![Arc::new(id_array), Arc::new(name_array)],
1987            None,
1988        );
1989
1990        // Create ListView: [{1, "a"}, {2, "b"}], null, [{3, "c"}, {4, "d"}, {5, "e"}]
1991        let list_view = ListViewArray::new(
1992            list_field,
1993            vec![0, 2, 2].into(), // offsets
1994            vec![2, 0, 3].into(), // sizes
1995            Arc::new(struct_array),
1996            Some(vec![true, false, true].into()),
1997        );
1998
1999        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2000
2001        roundtrip(batch, None);
2002    }
2003
2004    #[test]
2005    fn arrow_writer_binary() {
2006        let string_field = Field::new("a", DataType::Utf8, false);
2007        let binary_field = Field::new("b", DataType::Binary, false);
2008        let schema = Schema::new(vec![string_field, binary_field]);
2009
2010        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2011        let raw_binary_values = [
2012            b"foo".to_vec(),
2013            b"bar".to_vec(),
2014            b"baz".to_vec(),
2015            b"quux".to_vec(),
2016        ];
2017        let raw_binary_value_refs = raw_binary_values
2018            .iter()
2019            .map(|x| x.as_slice())
2020            .collect::<Vec<_>>();
2021
2022        let string_values = StringArray::from(raw_string_values.clone());
2023        let binary_values = BinaryArray::from(raw_binary_value_refs);
2024        let batch = RecordBatch::try_new(
2025            Arc::new(schema),
2026            vec![Arc::new(string_values), Arc::new(binary_values)],
2027        )
2028        .unwrap();
2029
2030        roundtrip(batch, Some(SMALL_SIZE / 2));
2031    }
2032
2033    #[test]
2034    fn arrow_writer_binary_view() {
2035        let string_field = Field::new("a", DataType::Utf8View, false);
2036        let binary_field = Field::new("b", DataType::BinaryView, false);
2037        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2038        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2039
2040        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2041        let raw_binary_values = vec![
2042            b"foo".to_vec(),
2043            b"bar".to_vec(),
2044            b"large payload over 12 bytes".to_vec(),
2045            b"lulu".to_vec(),
2046        ];
2047        let nullable_string_values =
2048            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2049
2050        let string_view_values = StringViewArray::from(raw_string_values);
2051        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2052        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2053        let batch = RecordBatch::try_new(
2054            Arc::new(schema),
2055            vec![
2056                Arc::new(string_view_values),
2057                Arc::new(binary_view_values),
2058                Arc::new(nullable_string_view_values),
2059            ],
2060        )
2061        .unwrap();
2062
2063        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2064        roundtrip(batch, None);
2065    }
2066
2067    #[test]
2068    fn arrow_writer_binary_view_long_value() {
2069        let string_field = Field::new("a", DataType::Utf8View, false);
2070        let binary_field = Field::new("b", DataType::BinaryView, false);
2071        let schema = Schema::new(vec![string_field, binary_field]);
2072
2073        // There is special case validation for long values (greater than 128)
2074        // 128 encodes as 0x80 0x00 0x00 0x00 in little endian, which should
2075        // trigger the long-string UTF-8 validation branch in the plain decoder.
2076        let long = "a".repeat(128);
2077        let raw_string_values = vec!["foo", long.as_str(), "bar"];
2078        let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2079
2080        let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2081        let binary_view_values: ArrayRef =
2082            Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2083
2084        one_column_roundtrip(Arc::clone(&string_view_values), false);
2085        one_column_roundtrip(Arc::clone(&binary_view_values), false);
2086
2087        let batch = RecordBatch::try_new(
2088            Arc::new(schema),
2089            vec![string_view_values, binary_view_values],
2090        )
2091        .unwrap();
2092
2093        // Disable dictionary to exercise plain encoding paths in the reader.
2094        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2095            let props = WriterProperties::builder()
2096                .set_writer_version(version)
2097                .set_dictionary_enabled(false)
2098                .build();
2099            roundtrip_opts(&batch, props);
2100        }
2101    }
2102
2103    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2104        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2105        let schema = Schema::new(vec![decimal_field]);
2106
2107        let decimal_values = vec![10_000, 50_000, 0, -100]
2108            .into_iter()
2109            .map(Some)
2110            .collect::<Decimal128Array>()
2111            .with_precision_and_scale(precision, scale)
2112            .unwrap();
2113
2114        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2115    }
2116
2117    #[test]
2118    fn arrow_writer_decimal() {
2119        // int32 to store the decimal value
2120        let batch_int32_decimal = get_decimal_batch(5, 2);
2121        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2122        // int64 to store the decimal value
2123        let batch_int64_decimal = get_decimal_batch(12, 2);
2124        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2125        // fixed_length_byte_array to store the decimal value
2126        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2127        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2128    }
2129
2130    #[test]
2131    fn arrow_writer_complex() {
2132        // define schema
2133        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2134        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2135        let struct_field_g = Arc::new(Field::new_list(
2136            "g",
2137            Field::new_list_field(DataType::Int16, true),
2138            false,
2139        ));
2140        let struct_field_h = Arc::new(Field::new_list(
2141            "h",
2142            Field::new_list_field(DataType::Int16, false),
2143            true,
2144        ));
2145        let struct_field_e = Arc::new(Field::new_struct(
2146            "e",
2147            vec![
2148                struct_field_f.clone(),
2149                struct_field_g.clone(),
2150                struct_field_h.clone(),
2151            ],
2152            false,
2153        ));
2154        let schema = Schema::new(vec![
2155            Field::new("a", DataType::Int32, false),
2156            Field::new("b", DataType::Int32, true),
2157            Field::new_struct(
2158                "c",
2159                vec![struct_field_d.clone(), struct_field_e.clone()],
2160                false,
2161            ),
2162        ]);
2163
2164        // create some data
2165        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2166        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2167        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2168        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2169
2170        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2171
2172        // Construct a buffer for value offsets, for the nested array:
2173        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
2174        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2175
2176        // Construct a list array from the above two
2177        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2178            .len(5)
2179            .add_buffer(g_value_offsets.clone())
2180            .add_child_data(g_value.to_data())
2181            .build()
2182            .unwrap();
2183        let g = ListArray::from(g_list_data);
2184        // The difference between g and h is that h has a null bitmap
2185        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2186            .len(5)
2187            .add_buffer(g_value_offsets)
2188            .add_child_data(g_value.to_data())
2189            .null_bit_buffer(Some(Buffer::from([0b00011011])))
2190            .build()
2191            .unwrap();
2192        let h = ListArray::from(h_list_data);
2193
2194        let e = StructArray::from(vec![
2195            (struct_field_f, Arc::new(f) as ArrayRef),
2196            (struct_field_g, Arc::new(g) as ArrayRef),
2197            (struct_field_h, Arc::new(h) as ArrayRef),
2198        ]);
2199
2200        let c = StructArray::from(vec![
2201            (struct_field_d, Arc::new(d) as ArrayRef),
2202            (struct_field_e, Arc::new(e) as ArrayRef),
2203        ]);
2204
2205        // build a record batch
2206        let batch = RecordBatch::try_new(
2207            Arc::new(schema),
2208            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2209        )
2210        .unwrap();
2211
2212        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2213        roundtrip(batch, Some(SMALL_SIZE / 3));
2214    }
2215
2216    #[test]
2217    fn arrow_writer_complex_mixed() {
2218        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
2219        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
2220
2221        // define schema
2222        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2223        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2224        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2225        let schema = Schema::new(vec![Field::new(
2226            "some_nested_object",
2227            DataType::Struct(Fields::from(vec![
2228                offset_field.clone(),
2229                partition_field.clone(),
2230                topic_field.clone(),
2231            ])),
2232            false,
2233        )]);
2234
2235        // create some data
2236        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2237        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2238        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2239
2240        let some_nested_object = StructArray::from(vec![
2241            (offset_field, Arc::new(offset) as ArrayRef),
2242            (partition_field, Arc::new(partition) as ArrayRef),
2243            (topic_field, Arc::new(topic) as ArrayRef),
2244        ]);
2245
2246        // build a record batch
2247        let batch =
2248            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2249
2250        roundtrip(batch, Some(SMALL_SIZE / 2));
2251    }
2252
2253    #[test]
2254    fn arrow_writer_map() {
2255        // Note: we are using the JSON Arrow reader for brevity
2256        let json_content = r#"
2257        {"stocks":{"long": "$AAA", "short": "$BBB"}}
2258        {"stocks":{"long": null, "long": "$CCC", "short": null}}
2259        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2260        "#;
2261        let entries_struct_type = DataType::Struct(Fields::from(vec![
2262            Field::new("key", DataType::Utf8, false),
2263            Field::new("value", DataType::Utf8, true),
2264        ]));
2265        let stocks_field = Field::new(
2266            "stocks",
2267            DataType::Map(
2268                Arc::new(Field::new("entries", entries_struct_type, false)),
2269                false,
2270            ),
2271            true,
2272        );
2273        let schema = Arc::new(Schema::new(vec![stocks_field]));
2274        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2275        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2276
2277        let batch = reader.next().unwrap().unwrap();
2278        roundtrip(batch, None);
2279    }
2280
2281    #[test]
2282    fn arrow_writer_2_level_struct() {
2283        // tests writing <struct<struct<primitive>>
2284        let field_c = Field::new("c", DataType::Int32, true);
2285        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2286        let type_a = DataType::Struct(vec![field_b.clone()].into());
2287        let field_a = Field::new("a", type_a, true);
2288        let schema = Schema::new(vec![field_a.clone()]);
2289
2290        // create data
2291        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2292        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2293            .len(6)
2294            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2295            .add_child_data(c.into_data())
2296            .build()
2297            .unwrap();
2298        let b = StructArray::from(b_data);
2299        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2300            .len(6)
2301            .null_bit_buffer(Some(Buffer::from([0b00101111])))
2302            .add_child_data(b.into_data())
2303            .build()
2304            .unwrap();
2305        let a = StructArray::from(a_data);
2306
2307        assert_eq!(a.null_count(), 1);
2308        assert_eq!(a.column(0).null_count(), 2);
2309
2310        // build a racord batch
2311        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2312
2313        roundtrip(batch, Some(SMALL_SIZE / 2));
2314    }
2315
2316    #[test]
2317    fn arrow_writer_2_level_struct_non_null() {
2318        // tests writing <struct<struct<primitive>>
2319        let field_c = Field::new("c", DataType::Int32, false);
2320        let type_b = DataType::Struct(vec![field_c].into());
2321        let field_b = Field::new("b", type_b.clone(), false);
2322        let type_a = DataType::Struct(vec![field_b].into());
2323        let field_a = Field::new("a", type_a.clone(), false);
2324        let schema = Schema::new(vec![field_a]);
2325
2326        // create data
2327        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2328        let b_data = ArrayDataBuilder::new(type_b)
2329            .len(6)
2330            .add_child_data(c.into_data())
2331            .build()
2332            .unwrap();
2333        let b = StructArray::from(b_data);
2334        let a_data = ArrayDataBuilder::new(type_a)
2335            .len(6)
2336            .add_child_data(b.into_data())
2337            .build()
2338            .unwrap();
2339        let a = StructArray::from(a_data);
2340
2341        assert_eq!(a.null_count(), 0);
2342        assert_eq!(a.column(0).null_count(), 0);
2343
2344        // build a racord batch
2345        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2346
2347        roundtrip(batch, Some(SMALL_SIZE / 2));
2348    }
2349
2350    #[test]
2351    fn arrow_writer_2_level_struct_mixed_null() {
2352        // tests writing <struct<struct<primitive>>
2353        let field_c = Field::new("c", DataType::Int32, false);
2354        let type_b = DataType::Struct(vec![field_c].into());
2355        let field_b = Field::new("b", type_b.clone(), true);
2356        let type_a = DataType::Struct(vec![field_b].into());
2357        let field_a = Field::new("a", type_a.clone(), false);
2358        let schema = Schema::new(vec![field_a]);
2359
2360        // create data
2361        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2362        let b_data = ArrayDataBuilder::new(type_b)
2363            .len(6)
2364            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2365            .add_child_data(c.into_data())
2366            .build()
2367            .unwrap();
2368        let b = StructArray::from(b_data);
2369        // a intentionally has no null buffer, to test that this is handled correctly
2370        let a_data = ArrayDataBuilder::new(type_a)
2371            .len(6)
2372            .add_child_data(b.into_data())
2373            .build()
2374            .unwrap();
2375        let a = StructArray::from(a_data);
2376
2377        assert_eq!(a.null_count(), 0);
2378        assert_eq!(a.column(0).null_count(), 2);
2379
2380        // build a racord batch
2381        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2382
2383        roundtrip(batch, Some(SMALL_SIZE / 2));
2384    }
2385
2386    #[test]
2387    fn arrow_writer_2_level_struct_mixed_null_2() {
2388        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2389        let field_c = Field::new("c", DataType::Int32, false);
2390        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2391        let field_e = Field::new(
2392            "e",
2393            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2394            false,
2395        );
2396
2397        let field_b = Field::new(
2398            "b",
2399            DataType::Struct(vec![field_c, field_d, field_e].into()),
2400            false,
2401        );
2402        let type_a = DataType::Struct(vec![field_b.clone()].into());
2403        let field_a = Field::new("a", type_a, true);
2404        let schema = Schema::new(vec![field_a.clone()]);
2405
2406        // create data
2407        let c = Int32Array::from_iter_values(0..6);
2408        let d = FixedSizeBinaryArray::try_from_iter(
2409            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2410        )
2411        .expect("four byte values");
2412        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2413        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2414            .len(6)
2415            .add_child_data(c.into_data())
2416            .add_child_data(d.into_data())
2417            .add_child_data(e.into_data())
2418            .build()
2419            .unwrap();
2420        let b = StructArray::from(b_data);
2421        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2422            .len(6)
2423            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2424            .add_child_data(b.into_data())
2425            .build()
2426            .unwrap();
2427        let a = StructArray::from(a_data);
2428
2429        assert_eq!(a.null_count(), 3);
2430        assert_eq!(a.column(0).null_count(), 0);
2431
2432        // build a record batch
2433        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2434
2435        roundtrip(batch, Some(SMALL_SIZE / 2));
2436    }
2437
2438    #[test]
2439    fn test_fixed_size_binary_in_dict() {
2440        fn test_fixed_size_binary_in_dict_inner<K>()
2441        where
2442            K: ArrowDictionaryKeyType,
2443            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2444            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2445        {
2446            let field = Field::new(
2447                "a",
2448                DataType::Dictionary(
2449                    Box::new(K::DATA_TYPE),
2450                    Box::new(DataType::FixedSizeBinary(4)),
2451                ),
2452                false,
2453            );
2454            let schema = Schema::new(vec![field]);
2455
2456            let keys: Vec<K::Native> = vec![
2457                K::Native::try_from(0u8).unwrap(),
2458                K::Native::try_from(0u8).unwrap(),
2459                K::Native::try_from(1u8).unwrap(),
2460            ];
2461            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2462            let values = FixedSizeBinaryArray::try_from_iter(
2463                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2464            )
2465            .unwrap();
2466
2467            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2468            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2469            roundtrip(batch, None);
2470        }
2471
2472        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2473        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2474        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2475        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2476        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2477        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2478        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2479        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2480    }
2481
2482    #[test]
2483    fn test_empty_dict() {
2484        let struct_fields = Fields::from(vec![Field::new(
2485            "dict",
2486            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2487            false,
2488        )]);
2489
2490        let schema = Schema::new(vec![Field::new_struct(
2491            "struct",
2492            struct_fields.clone(),
2493            true,
2494        )]);
2495        let dictionary = Arc::new(DictionaryArray::new(
2496            Int32Array::new_null(5),
2497            Arc::new(StringArray::new_null(0)),
2498        ));
2499
2500        let s = StructArray::new(
2501            struct_fields,
2502            vec![dictionary],
2503            Some(NullBuffer::new_null(5)),
2504        );
2505
2506        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2507        roundtrip(batch, None);
2508    }
2509    #[test]
2510    fn arrow_writer_page_size() {
2511        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2512
2513        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2514
2515        // Generate an array of 10 unique 10 character string
2516        for i in 0..10 {
2517            let value = i
2518                .to_string()
2519                .repeat(10)
2520                .chars()
2521                .take(10)
2522                .collect::<String>();
2523
2524            builder.append_value(value);
2525        }
2526
2527        let array = Arc::new(builder.finish());
2528
2529        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2530
2531        let file = tempfile::tempfile().unwrap();
2532
2533        // Set everything very low so we fallback to PLAIN encoding after the first row
2534        let props = WriterProperties::builder()
2535            .set_data_page_size_limit(1)
2536            .set_dictionary_page_size_limit(1)
2537            .set_write_batch_size(1)
2538            .build();
2539
2540        let mut writer =
2541            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2542                .expect("Unable to write file");
2543        writer.write(&batch).unwrap();
2544        writer.close().unwrap();
2545
2546        let options = ReadOptionsBuilder::new().with_page_index().build();
2547        let reader =
2548            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2549
2550        let column = reader.metadata().row_group(0).columns();
2551
2552        assert_eq!(column.len(), 1);
2553
2554        // We should write one row before falling back to PLAIN encoding so there should still be a
2555        // dictionary page.
2556        assert!(
2557            column[0].dictionary_page_offset().is_some(),
2558            "Expected a dictionary page"
2559        );
2560
2561        assert!(reader.metadata().offset_index().is_some());
2562        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2563
2564        let page_locations = offset_indexes[0].page_locations.clone();
2565
2566        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2567        // so we expect one dictionary encoded page and then a page per row thereafter.
2568        assert_eq!(
2569            page_locations.len(),
2570            10,
2571            "Expected 10 pages but got {page_locations:#?}"
2572        );
2573    }
2574
2575    #[test]
2576    fn arrow_writer_float_nans() {
2577        let f16_field = Field::new("a", DataType::Float16, false);
2578        let f32_field = Field::new("b", DataType::Float32, false);
2579        let f64_field = Field::new("c", DataType::Float64, false);
2580        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2581
2582        let f16_values = (0..MEDIUM_SIZE)
2583            .map(|i| {
2584                Some(if i % 2 == 0 {
2585                    f16::NAN
2586                } else {
2587                    f16::from_f32(i as f32)
2588                })
2589            })
2590            .collect::<Float16Array>();
2591
2592        let f32_values = (0..MEDIUM_SIZE)
2593            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2594            .collect::<Float32Array>();
2595
2596        let f64_values = (0..MEDIUM_SIZE)
2597            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2598            .collect::<Float64Array>();
2599
2600        let batch = RecordBatch::try_new(
2601            Arc::new(schema),
2602            vec![
2603                Arc::new(f16_values),
2604                Arc::new(f32_values),
2605                Arc::new(f64_values),
2606            ],
2607        )
2608        .unwrap();
2609
2610        roundtrip(batch, None);
2611    }
2612
2613    const SMALL_SIZE: usize = 7;
2614    const MEDIUM_SIZE: usize = 63;
2615
2616    // Write the batch to parquet and read it back out, ensuring
2617    // that what comes out is the same as what was written in
2618    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2619        let mut files = vec![];
2620        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2621            let mut props = WriterProperties::builder().set_writer_version(version);
2622
2623            if let Some(size) = max_row_group_size {
2624                props = props.set_max_row_group_row_count(Some(size))
2625            }
2626
2627            let props = props.build();
2628            files.push(roundtrip_opts(&expected_batch, props))
2629        }
2630        files
2631    }
2632
2633    // Round trip the specified record batch with the specified writer properties,
2634    // to an in-memory file, and validate the arrays using the specified function.
2635    // Returns the in-memory file.
2636    fn roundtrip_opts_with_array_validation<F>(
2637        expected_batch: &RecordBatch,
2638        props: WriterProperties,
2639        validate: F,
2640    ) -> Bytes
2641    where
2642        F: Fn(&ArrayData, &ArrayData),
2643    {
2644        let mut file = vec![];
2645
2646        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2647            .expect("Unable to write file");
2648        writer.write(expected_batch).unwrap();
2649        writer.close().unwrap();
2650
2651        let file = Bytes::from(file);
2652        let mut record_batch_reader =
2653            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2654
2655        let actual_batch = record_batch_reader
2656            .next()
2657            .expect("No batch found")
2658            .expect("Unable to get batch");
2659
2660        assert_eq!(expected_batch.schema(), actual_batch.schema());
2661        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2662        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2663        for i in 0..expected_batch.num_columns() {
2664            let expected_data = expected_batch.column(i).to_data();
2665            let actual_data = actual_batch.column(i).to_data();
2666            validate(&expected_data, &actual_data);
2667        }
2668
2669        file
2670    }
2671
2672    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2673        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2674            a.validate_full().expect("valid expected data");
2675            b.validate_full().expect("valid actual data");
2676            assert_eq!(a, b)
2677        })
2678    }
2679
2680    struct RoundTripOptions {
2681        values: ArrayRef,
2682        schema: SchemaRef,
2683        bloom_filter: bool,
2684        bloom_filter_ndv: Option<u64>,
2685        bloom_filter_position: BloomFilterPosition,
2686    }
2687
2688    impl RoundTripOptions {
2689        fn new(values: ArrayRef, nullable: bool) -> Self {
2690            let data_type = values.data_type().clone();
2691            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2692            Self {
2693                values,
2694                schema: Arc::new(schema),
2695                bloom_filter: false,
2696                bloom_filter_ndv: None,
2697                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2698            }
2699        }
2700    }
2701
2702    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2703        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2704    }
2705
2706    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2707        let mut options = RoundTripOptions::new(values, false);
2708        options.schema = schema;
2709        one_column_roundtrip_with_options(options)
2710    }
2711
2712    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2713        let RoundTripOptions {
2714            values,
2715            schema,
2716            bloom_filter,
2717            bloom_filter_ndv,
2718            bloom_filter_position,
2719        } = options;
2720
2721        let encodings = match values.data_type() {
2722            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2723                vec![
2724                    Encoding::PLAIN,
2725                    Encoding::DELTA_BYTE_ARRAY,
2726                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2727                ]
2728            }
2729            DataType::Int64
2730            | DataType::Int32
2731            | DataType::Int16
2732            | DataType::Int8
2733            | DataType::UInt64
2734            | DataType::UInt32
2735            | DataType::UInt16
2736            | DataType::UInt8 => vec![
2737                Encoding::PLAIN,
2738                Encoding::DELTA_BINARY_PACKED,
2739                Encoding::BYTE_STREAM_SPLIT,
2740            ],
2741            DataType::Float32 | DataType::Float64 => {
2742                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2743            }
2744            _ => vec![Encoding::PLAIN],
2745        };
2746
2747        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2748
2749        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2750
2751        let mut files = vec![];
2752        for dictionary_size in [0, 1, 1024] {
2753            for encoding in &encodings {
2754                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2755                    for row_group_size in row_group_sizes {
2756                        let mut builder = WriterProperties::builder()
2757                            .set_writer_version(version)
2758                            .set_max_row_group_row_count(Some(row_group_size))
2759                            .set_dictionary_enabled(dictionary_size != 0)
2760                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2761                            .set_encoding(*encoding)
2762                            .set_bloom_filter_enabled(bloom_filter)
2763                            .set_bloom_filter_position(bloom_filter_position);
2764                        if let Some(ndv) = bloom_filter_ndv {
2765                            builder = builder.set_bloom_filter_ndv(ndv);
2766                        }
2767                        let props = builder.build();
2768
2769                        files.push(roundtrip_opts(&expected_batch, props))
2770                    }
2771                }
2772            }
2773        }
2774        files
2775    }
2776
2777    fn values_required<A, I>(iter: I) -> Vec<Bytes>
2778    where
2779        A: From<Vec<I::Item>> + Array + 'static,
2780        I: IntoIterator,
2781    {
2782        let raw_values: Vec<_> = iter.into_iter().collect();
2783        let values = Arc::new(A::from(raw_values));
2784        one_column_roundtrip(values, false)
2785    }
2786
2787    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2788    where
2789        A: From<Vec<Option<I::Item>>> + Array + 'static,
2790        I: IntoIterator,
2791    {
2792        let optional_raw_values: Vec<_> = iter
2793            .into_iter()
2794            .enumerate()
2795            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2796            .collect();
2797        let optional_values = Arc::new(A::from(optional_raw_values));
2798        one_column_roundtrip(optional_values, true)
2799    }
2800
2801    fn required_and_optional<A, I>(iter: I)
2802    where
2803        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2804        I: IntoIterator + Clone,
2805    {
2806        values_required::<A, I>(iter.clone());
2807        values_optional::<A, I>(iter);
2808    }
2809
2810    fn check_bloom_filter<T: AsBytes>(
2811        files: Vec<Bytes>,
2812        file_column: String,
2813        positive_values: Vec<T>,
2814        negative_values: Vec<T>,
2815    ) {
2816        files.into_iter().take(1).for_each(|file| {
2817            let file_reader = SerializedFileReader::new_with_options(
2818                file,
2819                ReadOptionsBuilder::new()
2820                    .with_reader_properties(
2821                        ReaderProperties::builder()
2822                            .set_read_bloom_filter(true)
2823                            .build(),
2824                    )
2825                    .build(),
2826            )
2827            .expect("Unable to open file as Parquet");
2828            let metadata = file_reader.metadata();
2829
2830            // Gets bloom filters from all row groups.
2831            let mut bloom_filters: Vec<_> = vec![];
2832            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2833                if let Some((column_index, _)) = row_group
2834                    .columns()
2835                    .iter()
2836                    .enumerate()
2837                    .find(|(_, column)| column.column_path().string() == file_column)
2838                {
2839                    let row_group_reader = file_reader
2840                        .get_row_group(ri)
2841                        .expect("Unable to read row group");
2842                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2843                        bloom_filters.push(sbbf.clone());
2844                    } else {
2845                        panic!("No bloom filter for column named {file_column} found");
2846                    }
2847                } else {
2848                    panic!("No column named {file_column} found");
2849                }
2850            }
2851
2852            positive_values.iter().for_each(|value| {
2853                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2854                assert!(
2855                    found.is_some(),
2856                    "{}",
2857                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2858                );
2859            });
2860
2861            negative_values.iter().for_each(|value| {
2862                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2863                assert!(
2864                    found.is_none(),
2865                    "{}",
2866                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2867                );
2868            });
2869        });
2870    }
2871
2872    #[test]
2873    fn all_null_primitive_single_column() {
2874        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2875        one_column_roundtrip(values, true);
2876    }
2877    #[test]
2878    fn null_single_column() {
2879        let values = Arc::new(NullArray::new(SMALL_SIZE));
2880        one_column_roundtrip(values, true);
2881        // null arrays are always nullable, a test with non-nullable nulls fails
2882    }
2883
2884    #[test]
2885    fn bool_single_column() {
2886        required_and_optional::<BooleanArray, _>(
2887            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2888        );
2889    }
2890
2891    #[test]
2892    fn bool_large_single_column() {
2893        let values = Arc::new(
2894            [None, Some(true), Some(false)]
2895                .iter()
2896                .cycle()
2897                .copied()
2898                .take(200_000)
2899                .collect::<BooleanArray>(),
2900        );
2901        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2902        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2903        let file = tempfile::tempfile().unwrap();
2904
2905        let mut writer =
2906            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2907                .expect("Unable to write file");
2908        writer.write(&expected_batch).unwrap();
2909        writer.close().unwrap();
2910    }
2911
2912    #[test]
2913    fn check_page_offset_index_with_nan() {
2914        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2915        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2916        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2917
2918        let mut out = Vec::with_capacity(1024);
2919        let mut writer =
2920            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2921        writer.write(&batch).unwrap();
2922        let file_meta_data = writer.close().unwrap();
2923        for row_group in file_meta_data.row_groups() {
2924            for column in row_group.columns() {
2925                assert!(column.offset_index_offset().is_some());
2926                assert!(column.offset_index_length().is_some());
2927                assert!(column.column_index_offset().is_none());
2928                assert!(column.column_index_length().is_none());
2929            }
2930        }
2931    }
2932
2933    #[test]
2934    fn i8_single_column() {
2935        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2936    }
2937
2938    #[test]
2939    fn i16_single_column() {
2940        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2941    }
2942
2943    #[test]
2944    fn i32_single_column() {
2945        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2946    }
2947
2948    #[test]
2949    fn i64_single_column() {
2950        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2951    }
2952
2953    #[test]
2954    fn u8_single_column() {
2955        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2956    }
2957
2958    #[test]
2959    fn u16_single_column() {
2960        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2961    }
2962
2963    #[test]
2964    fn u32_single_column() {
2965        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2966    }
2967
2968    #[test]
2969    fn u64_single_column() {
2970        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2971    }
2972
2973    #[test]
2974    fn f32_single_column() {
2975        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2976    }
2977
2978    #[test]
2979    fn f64_single_column() {
2980        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2981    }
2982
2983    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2984    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2985    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2986
2987    #[test]
2988    fn timestamp_second_single_column() {
2989        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2990        let values = Arc::new(TimestampSecondArray::from(raw_values));
2991
2992        one_column_roundtrip(values, false);
2993    }
2994
2995    #[test]
2996    fn timestamp_millisecond_single_column() {
2997        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2998        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2999
3000        one_column_roundtrip(values, false);
3001    }
3002
3003    #[test]
3004    fn timestamp_microsecond_single_column() {
3005        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3006        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3007
3008        one_column_roundtrip(values, false);
3009    }
3010
3011    #[test]
3012    fn timestamp_nanosecond_single_column() {
3013        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3014        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3015
3016        one_column_roundtrip(values, false);
3017    }
3018
3019    #[test]
3020    fn date32_single_column() {
3021        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3022    }
3023
3024    #[test]
3025    fn date64_single_column() {
3026        // Date64 must be a multiple of 86400000, see ARROW-10925
3027        required_and_optional::<Date64Array, _>(
3028            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3029        );
3030    }
3031
3032    #[test]
3033    fn time32_second_single_column() {
3034        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3035    }
3036
3037    #[test]
3038    fn time32_millisecond_single_column() {
3039        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3040    }
3041
3042    #[test]
3043    fn time64_microsecond_single_column() {
3044        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3045    }
3046
3047    #[test]
3048    fn time64_nanosecond_single_column() {
3049        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3050    }
3051
3052    #[test]
3053    fn duration_second_single_column() {
3054        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3055    }
3056
3057    #[test]
3058    fn duration_millisecond_single_column() {
3059        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3060    }
3061
3062    #[test]
3063    fn duration_microsecond_single_column() {
3064        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3065    }
3066
3067    #[test]
3068    fn duration_nanosecond_single_column() {
3069        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3070    }
3071
3072    #[test]
3073    fn interval_year_month_single_column() {
3074        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3075    }
3076
3077    #[test]
3078    fn interval_day_time_single_column() {
3079        required_and_optional::<IntervalDayTimeArray, _>(vec![
3080            IntervalDayTime::new(0, 1),
3081            IntervalDayTime::new(0, 3),
3082            IntervalDayTime::new(3, -2),
3083            IntervalDayTime::new(-200, 4),
3084        ]);
3085    }
3086
3087    #[test]
3088    #[should_panic(
3089        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3090    )]
3091    fn interval_month_day_nano_single_column() {
3092        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3093            IntervalMonthDayNano::new(0, 1, 5),
3094            IntervalMonthDayNano::new(0, 3, 2),
3095            IntervalMonthDayNano::new(3, -2, -5),
3096            IntervalMonthDayNano::new(-200, 4, -1),
3097        ]);
3098    }
3099
3100    #[test]
3101    fn binary_single_column() {
3102        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3103        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3104        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3105
3106        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3107        values_required::<BinaryArray, _>(many_vecs_iter);
3108    }
3109
3110    #[test]
3111    fn binary_view_single_column() {
3112        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3113        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3114        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3115
3116        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3117        values_required::<BinaryViewArray, _>(many_vecs_iter);
3118    }
3119
3120    #[test]
3121    fn i32_column_bloom_filter_at_end() {
3122        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3123        let mut options = RoundTripOptions::new(array, false);
3124        options.bloom_filter = true;
3125        options.bloom_filter_position = BloomFilterPosition::End;
3126
3127        let files = one_column_roundtrip_with_options(options);
3128        check_bloom_filter(
3129            files,
3130            "col".to_string(),
3131            (0..SMALL_SIZE as i32).collect(),
3132            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3133        );
3134    }
3135
3136    #[test]
3137    fn i32_column_bloom_filter() {
3138        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3139        let mut options = RoundTripOptions::new(array, false);
3140        options.bloom_filter = true;
3141
3142        let files = one_column_roundtrip_with_options(options);
3143        check_bloom_filter(
3144            files,
3145            "col".to_string(),
3146            (0..SMALL_SIZE as i32).collect(),
3147            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3148        );
3149    }
3150
3151    /// Test that bloom filter folding produces correct results even when
3152    /// the configured NDV differs significantly from actual NDV.
3153    /// A large NDV means a larger initial filter that gets folded down;
3154    /// a small NDV means a smaller initial filter.
3155    #[test]
3156    fn i32_column_bloom_filter_fixed_ndv() {
3157        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3158
3159        // NDV much larger than actual distinct values — tests folding a large filter down
3160        let mut options = RoundTripOptions::new(array.clone(), false);
3161        options.bloom_filter = true;
3162        options.bloom_filter_ndv = Some(1_000_000);
3163
3164        let files = one_column_roundtrip_with_options(options);
3165        check_bloom_filter(
3166            files,
3167            "col".to_string(),
3168            (0..SMALL_SIZE as i32).collect(),
3169            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3170        );
3171
3172        // NDV smaller than actual distinct values — tests the underestimate path
3173        let mut options = RoundTripOptions::new(array, false);
3174        options.bloom_filter = true;
3175        options.bloom_filter_ndv = Some(3);
3176
3177        let files = one_column_roundtrip_with_options(options);
3178        check_bloom_filter(
3179            files,
3180            "col".to_string(),
3181            (0..SMALL_SIZE as i32).collect(),
3182            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3183        );
3184    }
3185
3186    #[test]
3187    fn binary_column_bloom_filter() {
3188        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3189        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3190        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3191
3192        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3193        let mut options = RoundTripOptions::new(array, false);
3194        options.bloom_filter = true;
3195
3196        let files = one_column_roundtrip_with_options(options);
3197        check_bloom_filter(
3198            files,
3199            "col".to_string(),
3200            many_vecs,
3201            vec![vec![(SMALL_SIZE + 1) as u8]],
3202        );
3203    }
3204
3205    #[test]
3206    fn empty_string_null_column_bloom_filter() {
3207        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3208        let raw_strs = raw_values.iter().map(|s| s.as_str());
3209
3210        let array = Arc::new(StringArray::from_iter_values(raw_strs));
3211        let mut options = RoundTripOptions::new(array, false);
3212        options.bloom_filter = true;
3213
3214        let files = one_column_roundtrip_with_options(options);
3215
3216        let optional_raw_values: Vec<_> = raw_values
3217            .iter()
3218            .enumerate()
3219            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3220            .collect();
3221        // For null slots, empty string should not be in bloom filter.
3222        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3223    }
3224
3225    #[test]
3226    fn large_binary_single_column() {
3227        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3228        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3229        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3230
3231        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
3232        values_required::<LargeBinaryArray, _>(many_vecs_iter);
3233    }
3234
3235    #[test]
3236    fn fixed_size_binary_single_column() {
3237        let mut builder = FixedSizeBinaryBuilder::new(4);
3238        builder.append_value(b"0123").unwrap();
3239        builder.append_null();
3240        builder.append_value(b"8910").unwrap();
3241        builder.append_value(b"1112").unwrap();
3242        let array = Arc::new(builder.finish());
3243
3244        one_column_roundtrip(array, true);
3245    }
3246
3247    #[test]
3248    fn string_single_column() {
3249        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3250        let raw_strs = raw_values.iter().map(|s| s.as_str());
3251
3252        required_and_optional::<StringArray, _>(raw_strs);
3253    }
3254
3255    #[test]
3256    fn large_string_single_column() {
3257        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3258        let raw_strs = raw_values.iter().map(|s| s.as_str());
3259
3260        required_and_optional::<LargeStringArray, _>(raw_strs);
3261    }
3262
3263    #[test]
3264    fn string_view_single_column() {
3265        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3266        let raw_strs = raw_values.iter().map(|s| s.as_str());
3267
3268        required_and_optional::<StringViewArray, _>(raw_strs);
3269    }
3270
3271    #[test]
3272    fn null_list_single_column() {
3273        let null_field = Field::new_list_field(DataType::Null, true);
3274        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3275
3276        let schema = Schema::new(vec![list_field]);
3277
3278        // Build [[], null, [null, null]]
3279        let a_values = NullArray::new(2);
3280        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3281        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3282            DataType::Null,
3283            true,
3284        ))))
3285        .len(3)
3286        .add_buffer(a_value_offsets)
3287        .null_bit_buffer(Some(Buffer::from([0b00000101])))
3288        .add_child_data(a_values.into_data())
3289        .build()
3290        .unwrap();
3291
3292        let a = ListArray::from(a_list_data);
3293
3294        assert!(a.is_valid(0));
3295        assert!(!a.is_valid(1));
3296        assert!(a.is_valid(2));
3297
3298        assert_eq!(a.value(0).len(), 0);
3299        assert_eq!(a.value(2).len(), 2);
3300        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3301
3302        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3303        roundtrip(batch, None);
3304    }
3305
3306    #[test]
3307    fn list_single_column() {
3308        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3309        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3310        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3311            DataType::Int32,
3312            false,
3313        ))))
3314        .len(5)
3315        .add_buffer(a_value_offsets)
3316        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3317        .add_child_data(a_values.into_data())
3318        .build()
3319        .unwrap();
3320
3321        assert_eq!(a_list_data.null_count(), 1);
3322
3323        let a = ListArray::from(a_list_data);
3324        let values = Arc::new(a);
3325
3326        one_column_roundtrip(values, true);
3327    }
3328
3329    #[test]
3330    fn large_list_single_column() {
3331        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3332        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3333        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3334            "large_item",
3335            DataType::Int32,
3336            true,
3337        ))))
3338        .len(5)
3339        .add_buffer(a_value_offsets)
3340        .add_child_data(a_values.into_data())
3341        .null_bit_buffer(Some(Buffer::from([0b00011011])))
3342        .build()
3343        .unwrap();
3344
3345        // I think this setup is incorrect because this should pass
3346        assert_eq!(a_list_data.null_count(), 1);
3347
3348        let a = LargeListArray::from(a_list_data);
3349        let values = Arc::new(a);
3350
3351        one_column_roundtrip(values, true);
3352    }
3353
3354    #[test]
3355    fn list_nested_nulls() {
3356        use arrow::datatypes::Int32Type;
3357        let data = vec![
3358            Some(vec![Some(1)]),
3359            Some(vec![Some(2), Some(3)]),
3360            None,
3361            Some(vec![Some(4), Some(5), None]),
3362            Some(vec![None]),
3363            Some(vec![Some(6), Some(7)]),
3364        ];
3365
3366        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3367        one_column_roundtrip(Arc::new(list), true);
3368
3369        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3370        one_column_roundtrip(Arc::new(list), true);
3371    }
3372
3373    #[test]
3374    fn struct_single_column() {
3375        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3376        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3377        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3378
3379        let values = Arc::new(s);
3380        one_column_roundtrip(values, false);
3381    }
3382
3383    #[test]
3384    fn list_and_map_coerced_names() {
3385        // Create map and list with non-Parquet naming
3386        let list_field =
3387            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3388        let map_field = Field::new_map(
3389            "my_map",
3390            "entries",
3391            Field::new("keys", DataType::Int32, false),
3392            Field::new("values", DataType::Int32, true),
3393            false,
3394            true,
3395        );
3396
3397        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3398        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3399
3400        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3401
3402        // Write data to Parquet but coerce names to match spec
3403        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3404        let file = tempfile::tempfile().unwrap();
3405        let mut writer =
3406            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3407
3408        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3409        writer.write(&batch).unwrap();
3410        let file_metadata = writer.close().unwrap();
3411
3412        let schema = file_metadata.file_metadata().schema();
3413        // Coerced name of "item" should be "element"
3414        let list_field = &schema.get_fields()[0].get_fields()[0];
3415        assert_eq!(list_field.get_fields()[0].name(), "element");
3416
3417        let map_field = &schema.get_fields()[1].get_fields()[0];
3418        // Coerced name of "entries" should be "key_value"
3419        assert_eq!(map_field.name(), "key_value");
3420        // Coerced name of "keys" should be "key"
3421        assert_eq!(map_field.get_fields()[0].name(), "key");
3422        // Coerced name of "values" should be "value"
3423        assert_eq!(map_field.get_fields()[1].name(), "value");
3424
3425        // Double check schema after reading from the file
3426        let reader = SerializedFileReader::new(file).unwrap();
3427        let file_schema = reader.metadata().file_metadata().schema();
3428        let fields = file_schema.get_fields();
3429        let list_field = &fields[0].get_fields()[0];
3430        assert_eq!(list_field.get_fields()[0].name(), "element");
3431        let map_field = &fields[1].get_fields()[0];
3432        assert_eq!(map_field.name(), "key_value");
3433        assert_eq!(map_field.get_fields()[0].name(), "key");
3434        assert_eq!(map_field.get_fields()[1].name(), "value");
3435    }
3436
3437    #[test]
3438    fn fallback_flush_data_page() {
3439        //tests if the Fallback::flush_data_page clears all buffers correctly
3440        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3441        let values = Arc::new(StringArray::from(raw_values));
3442        let encodings = vec![
3443            Encoding::DELTA_BYTE_ARRAY,
3444            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3445        ];
3446        let data_type = values.data_type().clone();
3447        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3448        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3449
3450        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3451        let data_page_size_limit: usize = 32;
3452        let write_batch_size: usize = 16;
3453
3454        for encoding in &encodings {
3455            for row_group_size in row_group_sizes {
3456                let props = WriterProperties::builder()
3457                    .set_writer_version(WriterVersion::PARQUET_2_0)
3458                    .set_max_row_group_row_count(Some(row_group_size))
3459                    .set_dictionary_enabled(false)
3460                    .set_encoding(*encoding)
3461                    .set_data_page_size_limit(data_page_size_limit)
3462                    .set_write_batch_size(write_batch_size)
3463                    .build();
3464
3465                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3466                    let string_array_a = StringArray::from(a.clone());
3467                    let string_array_b = StringArray::from(b.clone());
3468                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3469                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3470                    assert_eq!(
3471                        vec_a, vec_b,
3472                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3473                    );
3474                });
3475            }
3476        }
3477    }
3478
3479    #[test]
3480    fn arrow_writer_string_dictionary() {
3481        // define schema
3482        #[allow(deprecated)]
3483        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3484            "dictionary",
3485            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3486            true,
3487            42,
3488            true,
3489        )]));
3490
3491        // create some data
3492        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3493            .iter()
3494            .copied()
3495            .collect();
3496
3497        // build a record batch
3498        one_column_roundtrip_with_schema(Arc::new(d), schema);
3499    }
3500
3501    #[test]
3502    fn arrow_writer_test_type_compatibility() {
3503        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3504        where
3505            T1: Array + 'static,
3506            T2: Array + 'static,
3507        {
3508            let schema1 = Arc::new(Schema::new(vec![Field::new(
3509                "a",
3510                array1.data_type().clone(),
3511                false,
3512            )]));
3513
3514            let file = tempfile().unwrap();
3515            let mut writer =
3516                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3517
3518            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3519            writer.write(&rb1).unwrap();
3520
3521            let schema2 = Arc::new(Schema::new(vec![Field::new(
3522                "a",
3523                array2.data_type().clone(),
3524                false,
3525            )]));
3526            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3527            writer.write(&rb2).unwrap();
3528
3529            writer.close().unwrap();
3530
3531            let mut record_batch_reader =
3532                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3533            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3534
3535            let expected_batch =
3536                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3537            assert_eq!(actual_batch, expected_batch);
3538        }
3539
3540        // check compatibility between native and dictionaries
3541
3542        ensure_compatible_write(
3543            DictionaryArray::new(
3544                UInt8Array::from_iter_values(vec![0]),
3545                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3546            ),
3547            StringArray::from_iter_values(vec!["barquet"]),
3548            DictionaryArray::new(
3549                UInt8Array::from_iter_values(vec![0, 1]),
3550                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3551            ),
3552        );
3553
3554        ensure_compatible_write(
3555            StringArray::from_iter_values(vec!["parquet"]),
3556            DictionaryArray::new(
3557                UInt8Array::from_iter_values(vec![0]),
3558                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3559            ),
3560            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3561        );
3562
3563        // check compatibility between dictionaries with different key types
3564
3565        ensure_compatible_write(
3566            DictionaryArray::new(
3567                UInt8Array::from_iter_values(vec![0]),
3568                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3569            ),
3570            DictionaryArray::new(
3571                UInt16Array::from_iter_values(vec![0]),
3572                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3573            ),
3574            DictionaryArray::new(
3575                UInt8Array::from_iter_values(vec![0, 1]),
3576                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3577            ),
3578        );
3579
3580        // check compatibility between dictionaries with different value types
3581        ensure_compatible_write(
3582            DictionaryArray::new(
3583                UInt8Array::from_iter_values(vec![0]),
3584                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3585            ),
3586            DictionaryArray::new(
3587                UInt8Array::from_iter_values(vec![0]),
3588                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3589            ),
3590            DictionaryArray::new(
3591                UInt8Array::from_iter_values(vec![0, 1]),
3592                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3593            ),
3594        );
3595
3596        // check compatibility between a dictionary and a native array with a different type
3597        ensure_compatible_write(
3598            DictionaryArray::new(
3599                UInt8Array::from_iter_values(vec![0]),
3600                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3601            ),
3602            LargeStringArray::from_iter_values(vec!["barquet"]),
3603            DictionaryArray::new(
3604                UInt8Array::from_iter_values(vec![0, 1]),
3605                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3606            ),
3607        );
3608
3609        // check compatibility for string types
3610
3611        ensure_compatible_write(
3612            StringArray::from_iter_values(vec!["parquet"]),
3613            LargeStringArray::from_iter_values(vec!["barquet"]),
3614            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3615        );
3616
3617        ensure_compatible_write(
3618            LargeStringArray::from_iter_values(vec!["parquet"]),
3619            StringArray::from_iter_values(vec!["barquet"]),
3620            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3621        );
3622
3623        ensure_compatible_write(
3624            StringArray::from_iter_values(vec!["parquet"]),
3625            StringViewArray::from_iter_values(vec!["barquet"]),
3626            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3627        );
3628
3629        ensure_compatible_write(
3630            StringViewArray::from_iter_values(vec!["parquet"]),
3631            StringArray::from_iter_values(vec!["barquet"]),
3632            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3633        );
3634
3635        ensure_compatible_write(
3636            LargeStringArray::from_iter_values(vec!["parquet"]),
3637            StringViewArray::from_iter_values(vec!["barquet"]),
3638            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3639        );
3640
3641        ensure_compatible_write(
3642            StringViewArray::from_iter_values(vec!["parquet"]),
3643            LargeStringArray::from_iter_values(vec!["barquet"]),
3644            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3645        );
3646
3647        // check compatibility for binary types
3648
3649        ensure_compatible_write(
3650            BinaryArray::from_iter_values(vec![b"parquet"]),
3651            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3652            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3653        );
3654
3655        ensure_compatible_write(
3656            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3657            BinaryArray::from_iter_values(vec![b"barquet"]),
3658            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3659        );
3660
3661        ensure_compatible_write(
3662            BinaryArray::from_iter_values(vec![b"parquet"]),
3663            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3664            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3665        );
3666
3667        ensure_compatible_write(
3668            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3669            BinaryArray::from_iter_values(vec![b"barquet"]),
3670            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3671        );
3672
3673        ensure_compatible_write(
3674            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3675            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3676            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3677        );
3678
3679        ensure_compatible_write(
3680            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3681            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3682            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3683        );
3684
3685        // check compatibility for list types
3686
3687        let list_field_metadata = HashMap::from_iter(vec![(
3688            PARQUET_FIELD_ID_META_KEY.to_string(),
3689            "1".to_string(),
3690        )]);
3691        let list_field = Field::new_list_field(DataType::Int32, false);
3692
3693        let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3694        let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3695
3696        let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3697        let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3698
3699        let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3700        let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3701
3702        ensure_compatible_write(
3703            // when the initial schema has the metadata ...
3704            ListArray::try_new(
3705                Arc::new(
3706                    list_field
3707                        .clone()
3708                        .with_metadata(list_field_metadata.clone()),
3709                ),
3710                offsets1,
3711                values1,
3712                None,
3713            )
3714            .unwrap(),
3715            // ... and some intermediate schema doesn't have the metadata
3716            ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3717            // ... the write will still go through, and the resulting schema will inherit the initial metadata
3718            ListArray::try_new(
3719                Arc::new(
3720                    list_field
3721                        .clone()
3722                        .with_metadata(list_field_metadata.clone()),
3723                ),
3724                offsets_expected,
3725                values_expected,
3726                None,
3727            )
3728            .unwrap(),
3729        );
3730    }
3731
3732    #[test]
3733    fn arrow_writer_primitive_dictionary() {
3734        // define schema
3735        #[allow(deprecated)]
3736        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3737            "dictionary",
3738            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3739            true,
3740            42,
3741            true,
3742        )]));
3743
3744        // create some data
3745        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3746        builder.append(12345678).unwrap();
3747        builder.append_null();
3748        builder.append(22345678).unwrap();
3749        builder.append(12345678).unwrap();
3750        let d = builder.finish();
3751
3752        one_column_roundtrip_with_schema(Arc::new(d), schema);
3753    }
3754
3755    #[test]
3756    fn arrow_writer_decimal32_dictionary() {
3757        let integers = vec![12345, 56789, 34567];
3758
3759        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3760
3761        let values = Decimal32Array::from(integers.clone())
3762            .with_precision_and_scale(5, 2)
3763            .unwrap();
3764
3765        let array = DictionaryArray::new(keys, Arc::new(values));
3766        one_column_roundtrip(Arc::new(array.clone()), true);
3767
3768        let values = Decimal32Array::from(integers)
3769            .with_precision_and_scale(9, 2)
3770            .unwrap();
3771
3772        let array = array.with_values(Arc::new(values));
3773        one_column_roundtrip(Arc::new(array), true);
3774    }
3775
3776    #[test]
3777    fn arrow_writer_decimal64_dictionary() {
3778        let integers = vec![12345, 56789, 34567];
3779
3780        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3781
3782        let values = Decimal64Array::from(integers.clone())
3783            .with_precision_and_scale(5, 2)
3784            .unwrap();
3785
3786        let array = DictionaryArray::new(keys, Arc::new(values));
3787        one_column_roundtrip(Arc::new(array.clone()), true);
3788
3789        let values = Decimal64Array::from(integers)
3790            .with_precision_and_scale(12, 2)
3791            .unwrap();
3792
3793        let array = array.with_values(Arc::new(values));
3794        one_column_roundtrip(Arc::new(array), true);
3795    }
3796
3797    #[test]
3798    fn arrow_writer_decimal128_dictionary() {
3799        let integers = vec![12345, 56789, 34567];
3800
3801        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3802
3803        let values = Decimal128Array::from(integers.clone())
3804            .with_precision_and_scale(5, 2)
3805            .unwrap();
3806
3807        let array = DictionaryArray::new(keys, Arc::new(values));
3808        one_column_roundtrip(Arc::new(array.clone()), true);
3809
3810        let values = Decimal128Array::from(integers)
3811            .with_precision_and_scale(12, 2)
3812            .unwrap();
3813
3814        let array = array.with_values(Arc::new(values));
3815        one_column_roundtrip(Arc::new(array), true);
3816    }
3817
3818    #[test]
3819    fn arrow_writer_decimal256_dictionary() {
3820        let integers = vec![
3821            i256::from_i128(12345),
3822            i256::from_i128(56789),
3823            i256::from_i128(34567),
3824        ];
3825
3826        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3827
3828        let values = Decimal256Array::from(integers.clone())
3829            .with_precision_and_scale(5, 2)
3830            .unwrap();
3831
3832        let array = DictionaryArray::new(keys, Arc::new(values));
3833        one_column_roundtrip(Arc::new(array.clone()), true);
3834
3835        let values = Decimal256Array::from(integers)
3836            .with_precision_and_scale(12, 2)
3837            .unwrap();
3838
3839        let array = array.with_values(Arc::new(values));
3840        one_column_roundtrip(Arc::new(array), true);
3841    }
3842
3843    #[test]
3844    fn arrow_writer_string_dictionary_unsigned_index() {
3845        // define schema
3846        #[allow(deprecated)]
3847        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3848            "dictionary",
3849            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3850            true,
3851            42,
3852            true,
3853        )]));
3854
3855        // create some data
3856        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3857            .iter()
3858            .copied()
3859            .collect();
3860
3861        one_column_roundtrip_with_schema(Arc::new(d), schema);
3862    }
3863
3864    #[test]
3865    fn u32_min_max() {
3866        // check values roundtrip through parquet
3867        let src = [
3868            u32::MIN,
3869            u32::MIN + 1,
3870            (i32::MAX as u32) - 1,
3871            i32::MAX as u32,
3872            (i32::MAX as u32) + 1,
3873            u32::MAX - 1,
3874            u32::MAX,
3875        ];
3876        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3877        let files = one_column_roundtrip(values, false);
3878
3879        for file in files {
3880            // check statistics are valid
3881            let reader = SerializedFileReader::new(file).unwrap();
3882            let metadata = reader.metadata();
3883
3884            let mut row_offset = 0;
3885            for row_group in metadata.row_groups() {
3886                assert_eq!(row_group.num_columns(), 1);
3887                let column = row_group.column(0);
3888
3889                let num_values = column.num_values() as usize;
3890                let src_slice = &src[row_offset..row_offset + num_values];
3891                row_offset += column.num_values() as usize;
3892
3893                let stats = column.statistics().unwrap();
3894                if let Statistics::Int32(stats) = stats {
3895                    assert_eq!(
3896                        *stats.min_opt().unwrap() as u32,
3897                        *src_slice.iter().min().unwrap()
3898                    );
3899                    assert_eq!(
3900                        *stats.max_opt().unwrap() as u32,
3901                        *src_slice.iter().max().unwrap()
3902                    );
3903                } else {
3904                    panic!("Statistics::Int32 missing")
3905                }
3906            }
3907        }
3908    }
3909
3910    #[test]
3911    fn u64_min_max() {
3912        // check values roundtrip through parquet
3913        let src = [
3914            u64::MIN,
3915            u64::MIN + 1,
3916            (i64::MAX as u64) - 1,
3917            i64::MAX as u64,
3918            (i64::MAX as u64) + 1,
3919            u64::MAX - 1,
3920            u64::MAX,
3921        ];
3922        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3923        let files = one_column_roundtrip(values, false);
3924
3925        for file in files {
3926            // check statistics are valid
3927            let reader = SerializedFileReader::new(file).unwrap();
3928            let metadata = reader.metadata();
3929
3930            let mut row_offset = 0;
3931            for row_group in metadata.row_groups() {
3932                assert_eq!(row_group.num_columns(), 1);
3933                let column = row_group.column(0);
3934
3935                let num_values = column.num_values() as usize;
3936                let src_slice = &src[row_offset..row_offset + num_values];
3937                row_offset += column.num_values() as usize;
3938
3939                let stats = column.statistics().unwrap();
3940                if let Statistics::Int64(stats) = stats {
3941                    assert_eq!(
3942                        *stats.min_opt().unwrap() as u64,
3943                        *src_slice.iter().min().unwrap()
3944                    );
3945                    assert_eq!(
3946                        *stats.max_opt().unwrap() as u64,
3947                        *src_slice.iter().max().unwrap()
3948                    );
3949                } else {
3950                    panic!("Statistics::Int64 missing")
3951                }
3952            }
3953        }
3954    }
3955
3956    #[test]
3957    fn statistics_null_counts_only_nulls() {
3958        // check that null-count statistics for "only NULL"-columns are correct
3959        let values = Arc::new(UInt64Array::from(vec![None, None]));
3960        let files = one_column_roundtrip(values, true);
3961
3962        for file in files {
3963            // check statistics are valid
3964            let reader = SerializedFileReader::new(file).unwrap();
3965            let metadata = reader.metadata();
3966            assert_eq!(metadata.num_row_groups(), 1);
3967            let row_group = metadata.row_group(0);
3968            assert_eq!(row_group.num_columns(), 1);
3969            let column = row_group.column(0);
3970            let stats = column.statistics().unwrap();
3971            assert_eq!(stats.null_count_opt(), Some(2));
3972        }
3973    }
3974
3975    #[test]
3976    fn test_list_of_struct_roundtrip() {
3977        // define schema
3978        let int_field = Field::new("a", DataType::Int32, true);
3979        let int_field2 = Field::new("b", DataType::Int32, true);
3980
3981        let int_builder = Int32Builder::with_capacity(10);
3982        let int_builder2 = Int32Builder::with_capacity(10);
3983
3984        let struct_builder = StructBuilder::new(
3985            vec![int_field, int_field2],
3986            vec![Box::new(int_builder), Box::new(int_builder2)],
3987        );
3988        let mut list_builder = ListBuilder::new(struct_builder);
3989
3990        // Construct the following array
3991        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3992
3993        // [{a: 1, b: 2}]
3994        let values = list_builder.values();
3995        values
3996            .field_builder::<Int32Builder>(0)
3997            .unwrap()
3998            .append_value(1);
3999        values
4000            .field_builder::<Int32Builder>(1)
4001            .unwrap()
4002            .append_value(2);
4003        values.append(true);
4004        list_builder.append(true);
4005
4006        // []
4007        list_builder.append(true);
4008
4009        // null
4010        list_builder.append(false);
4011
4012        // [null, null]
4013        let values = list_builder.values();
4014        values
4015            .field_builder::<Int32Builder>(0)
4016            .unwrap()
4017            .append_null();
4018        values
4019            .field_builder::<Int32Builder>(1)
4020            .unwrap()
4021            .append_null();
4022        values.append(false);
4023        values
4024            .field_builder::<Int32Builder>(0)
4025            .unwrap()
4026            .append_null();
4027        values
4028            .field_builder::<Int32Builder>(1)
4029            .unwrap()
4030            .append_null();
4031        values.append(false);
4032        list_builder.append(true);
4033
4034        // [{a: null, b: 3}]
4035        let values = list_builder.values();
4036        values
4037            .field_builder::<Int32Builder>(0)
4038            .unwrap()
4039            .append_null();
4040        values
4041            .field_builder::<Int32Builder>(1)
4042            .unwrap()
4043            .append_value(3);
4044        values.append(true);
4045        list_builder.append(true);
4046
4047        // [{a: 2, b: null}]
4048        let values = list_builder.values();
4049        values
4050            .field_builder::<Int32Builder>(0)
4051            .unwrap()
4052            .append_value(2);
4053        values
4054            .field_builder::<Int32Builder>(1)
4055            .unwrap()
4056            .append_null();
4057        values.append(true);
4058        list_builder.append(true);
4059
4060        let array = Arc::new(list_builder.finish());
4061
4062        one_column_roundtrip(array, true);
4063    }
4064
4065    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4066        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4067    }
4068
4069    #[test]
4070    fn test_aggregates_records() {
4071        let arrays = [
4072            Int32Array::from((0..100).collect::<Vec<_>>()),
4073            Int32Array::from((0..50).collect::<Vec<_>>()),
4074            Int32Array::from((200..500).collect::<Vec<_>>()),
4075        ];
4076
4077        let schema = Arc::new(Schema::new(vec![Field::new(
4078            "int",
4079            ArrowDataType::Int32,
4080            false,
4081        )]));
4082
4083        let file = tempfile::tempfile().unwrap();
4084
4085        let props = WriterProperties::builder()
4086            .set_max_row_group_row_count(Some(200))
4087            .build();
4088
4089        let mut writer =
4090            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4091
4092        for array in arrays {
4093            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4094            writer.write(&batch).unwrap();
4095        }
4096
4097        writer.close().unwrap();
4098
4099        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4100        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4101
4102        let batches = builder
4103            .with_batch_size(100)
4104            .build()
4105            .unwrap()
4106            .collect::<ArrowResult<Vec<_>>>()
4107            .unwrap();
4108
4109        assert_eq!(batches.len(), 5);
4110        assert!(batches.iter().all(|x| x.num_columns() == 1));
4111
4112        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4113
4114        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4115
4116        let values: Vec<_> = batches
4117            .iter()
4118            .flat_map(|x| {
4119                x.column(0)
4120                    .as_any()
4121                    .downcast_ref::<Int32Array>()
4122                    .unwrap()
4123                    .values()
4124                    .iter()
4125                    .cloned()
4126            })
4127            .collect();
4128
4129        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4130        assert_eq!(&values, &expected_values)
4131    }
4132
4133    #[test]
4134    fn complex_aggregate() {
4135        // Tests aggregating nested data
4136        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4137        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4138        let struct_a = Arc::new(Field::new(
4139            "struct_a",
4140            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4141            true,
4142        ));
4143
4144        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4145        let struct_b = Arc::new(Field::new(
4146            "struct_b",
4147            DataType::Struct(vec![list_a.clone()].into()),
4148            false,
4149        ));
4150
4151        let schema = Arc::new(Schema::new(vec![struct_b]));
4152
4153        // create nested data
4154        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4155        let field_b_array =
4156            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4157
4158        let struct_a_array = StructArray::from(vec![
4159            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4160            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4161        ]);
4162
4163        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4164            .len(5)
4165            .add_buffer(Buffer::from_iter(vec![
4166                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4167            ]))
4168            .null_bit_buffer(Some(Buffer::from_iter(vec![
4169                true, false, true, false, true,
4170            ])))
4171            .child_data(vec![struct_a_array.into_data()])
4172            .build()
4173            .unwrap();
4174
4175        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4176        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4177
4178        let batch1 =
4179            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4180                .unwrap();
4181
4182        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4183        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4184
4185        let struct_a_array = StructArray::from(vec![
4186            (field_a, Arc::new(field_a_array) as ArrayRef),
4187            (field_b, Arc::new(field_b_array) as ArrayRef),
4188        ]);
4189
4190        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4191            .len(2)
4192            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4193            .child_data(vec![struct_a_array.into_data()])
4194            .build()
4195            .unwrap();
4196
4197        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4198        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4199
4200        let batch2 =
4201            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4202                .unwrap();
4203
4204        let batches = &[batch1, batch2];
4205
4206        // Verify data is as expected
4207
4208        let expected = r#"
4209            +-------------------------------------------------------------------------------------------------------+
4210            | struct_b                                                                                              |
4211            +-------------------------------------------------------------------------------------------------------+
4212            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
4213            | {list: }                                                                                              |
4214            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
4215            | {list: }                                                                                              |
4216            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
4217            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4218            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
4219            +-------------------------------------------------------------------------------------------------------+
4220        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4221
4222        let actual = pretty_format_batches(batches).unwrap().to_string();
4223        assert_eq!(actual, expected);
4224
4225        // Write data
4226        let file = tempfile::tempfile().unwrap();
4227        let props = WriterProperties::builder()
4228            .set_max_row_group_row_count(Some(6))
4229            .build();
4230
4231        let mut writer =
4232            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4233
4234        for batch in batches {
4235            writer.write(batch).unwrap();
4236        }
4237        writer.close().unwrap();
4238
4239        // Read Data
4240        // Should have written entire first batch and first row of second to the first row group
4241        // leaving a single row in the second row group
4242
4243        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4244        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4245
4246        let batches = builder
4247            .with_batch_size(2)
4248            .build()
4249            .unwrap()
4250            .collect::<ArrowResult<Vec<_>>>()
4251            .unwrap();
4252
4253        assert_eq!(batches.len(), 4);
4254        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4255        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4256
4257        let actual = pretty_format_batches(&batches).unwrap().to_string();
4258        assert_eq!(actual, expected);
4259    }
4260
4261    #[test]
4262    fn test_arrow_writer_metadata() {
4263        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4264        let file_schema = batch_schema.clone().with_metadata(
4265            vec![("foo".to_string(), "bar".to_string())]
4266                .into_iter()
4267                .collect(),
4268        );
4269
4270        let batch = RecordBatch::try_new(
4271            Arc::new(batch_schema),
4272            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4273        )
4274        .unwrap();
4275
4276        let mut buf = Vec::with_capacity(1024);
4277        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4278        writer.write(&batch).unwrap();
4279        writer.close().unwrap();
4280    }
4281
4282    #[test]
4283    fn test_arrow_writer_nullable() {
4284        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4285        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4286        let file_schema = Arc::new(file_schema);
4287
4288        let batch = RecordBatch::try_new(
4289            Arc::new(batch_schema),
4290            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4291        )
4292        .unwrap();
4293
4294        let mut buf = Vec::with_capacity(1024);
4295        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4296        writer.write(&batch).unwrap();
4297        writer.close().unwrap();
4298
4299        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4300        let back = read.next().unwrap().unwrap();
4301        assert_eq!(back.schema(), file_schema);
4302        assert_ne!(back.schema(), batch.schema());
4303        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4304    }
4305
4306    #[test]
4307    fn in_progress_accounting() {
4308        // define schema
4309        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4310
4311        // create some data
4312        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4313
4314        // build a record batch
4315        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4316
4317        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4318
4319        // starts empty
4320        assert_eq!(writer.in_progress_size(), 0);
4321        assert_eq!(writer.in_progress_rows(), 0);
4322        assert_eq!(writer.memory_size(), 0);
4323        assert_eq!(writer.bytes_written(), 4); // Initial header
4324        writer.write(&batch).unwrap();
4325
4326        // updated on write
4327        let initial_size = writer.in_progress_size();
4328        assert!(initial_size > 0);
4329        assert_eq!(writer.in_progress_rows(), 5);
4330        let initial_memory = writer.memory_size();
4331        assert!(initial_memory > 0);
4332        // memory estimate is larger than estimated encoded size
4333        assert!(
4334            initial_size <= initial_memory,
4335            "{initial_size} <= {initial_memory}"
4336        );
4337
4338        // updated on second write
4339        writer.write(&batch).unwrap();
4340        assert!(writer.in_progress_size() > initial_size);
4341        assert_eq!(writer.in_progress_rows(), 10);
4342        assert!(writer.memory_size() > initial_memory);
4343        assert!(
4344            writer.in_progress_size() <= writer.memory_size(),
4345            "in_progress_size {} <= memory_size {}",
4346            writer.in_progress_size(),
4347            writer.memory_size()
4348        );
4349
4350        // in progress tracking is cleared, but the overall data written is updated
4351        let pre_flush_bytes_written = writer.bytes_written();
4352        writer.flush().unwrap();
4353        assert_eq!(writer.in_progress_size(), 0);
4354        assert_eq!(writer.memory_size(), 0);
4355        assert!(writer.bytes_written() > pre_flush_bytes_written);
4356
4357        writer.close().unwrap();
4358    }
4359
4360    #[test]
4361    fn test_writer_all_null() {
4362        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4363        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4364        let batch = RecordBatch::try_from_iter(vec![
4365            ("a", Arc::new(a) as ArrayRef),
4366            ("b", Arc::new(b) as ArrayRef),
4367        ])
4368        .unwrap();
4369
4370        let mut buf = Vec::with_capacity(1024);
4371        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4372        writer.write(&batch).unwrap();
4373        writer.close().unwrap();
4374
4375        let bytes = Bytes::from(buf);
4376        let options = ReadOptionsBuilder::new().with_page_index().build();
4377        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4378        let index = reader.metadata().offset_index().unwrap();
4379
4380        assert_eq!(index.len(), 1);
4381        assert_eq!(index[0].len(), 2); // 2 columns
4382        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
4383        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
4384    }
4385
4386    #[test]
4387    fn test_disabled_statistics_with_page() {
4388        let file_schema = Schema::new(vec![
4389            Field::new("a", DataType::Utf8, true),
4390            Field::new("b", DataType::Utf8, true),
4391        ]);
4392        let file_schema = Arc::new(file_schema);
4393
4394        let batch = RecordBatch::try_new(
4395            file_schema.clone(),
4396            vec![
4397                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4398                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4399            ],
4400        )
4401        .unwrap();
4402
4403        let props = WriterProperties::builder()
4404            .set_statistics_enabled(EnabledStatistics::None)
4405            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4406            .build();
4407
4408        let mut buf = Vec::with_capacity(1024);
4409        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4410        writer.write(&batch).unwrap();
4411
4412        let metadata = writer.close().unwrap();
4413        assert_eq!(metadata.num_row_groups(), 1);
4414        let row_group = metadata.row_group(0);
4415        assert_eq!(row_group.num_columns(), 2);
4416        // Column "a" has both offset and column index, as requested
4417        assert!(row_group.column(0).offset_index_offset().is_some());
4418        assert!(row_group.column(0).column_index_offset().is_some());
4419        // Column "b" should only have offset index
4420        assert!(row_group.column(1).offset_index_offset().is_some());
4421        assert!(row_group.column(1).column_index_offset().is_none());
4422
4423        let options = ReadOptionsBuilder::new().with_page_index().build();
4424        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4425
4426        let row_group = reader.get_row_group(0).unwrap();
4427        let a_col = row_group.metadata().column(0);
4428        let b_col = row_group.metadata().column(1);
4429
4430        // Column chunk of column "a" should have chunk level statistics
4431        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4432            let min = byte_array_stats.min_opt().unwrap();
4433            let max = byte_array_stats.max_opt().unwrap();
4434
4435            assert_eq!(min.as_bytes(), b"a");
4436            assert_eq!(max.as_bytes(), b"d");
4437        } else {
4438            panic!("expecting Statistics::ByteArray");
4439        }
4440
4441        // The column chunk for column "b" shouldn't have statistics
4442        assert!(b_col.statistics().is_none());
4443
4444        let offset_index = reader.metadata().offset_index().unwrap();
4445        assert_eq!(offset_index.len(), 1); // 1 row group
4446        assert_eq!(offset_index[0].len(), 2); // 2 columns
4447
4448        let column_index = reader.metadata().column_index().unwrap();
4449        assert_eq!(column_index.len(), 1); // 1 row group
4450        assert_eq!(column_index[0].len(), 2); // 2 columns
4451
4452        let a_idx = &column_index[0][0];
4453        assert!(
4454            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4455            "{a_idx:?}"
4456        );
4457        let b_idx = &column_index[0][1];
4458        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4459    }
4460
4461    #[test]
4462    fn test_disabled_statistics_with_chunk() {
4463        let file_schema = Schema::new(vec![
4464            Field::new("a", DataType::Utf8, true),
4465            Field::new("b", DataType::Utf8, true),
4466        ]);
4467        let file_schema = Arc::new(file_schema);
4468
4469        let batch = RecordBatch::try_new(
4470            file_schema.clone(),
4471            vec![
4472                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4473                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4474            ],
4475        )
4476        .unwrap();
4477
4478        let props = WriterProperties::builder()
4479            .set_statistics_enabled(EnabledStatistics::None)
4480            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4481            .build();
4482
4483        let mut buf = Vec::with_capacity(1024);
4484        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4485        writer.write(&batch).unwrap();
4486
4487        let metadata = writer.close().unwrap();
4488        assert_eq!(metadata.num_row_groups(), 1);
4489        let row_group = metadata.row_group(0);
4490        assert_eq!(row_group.num_columns(), 2);
4491        // Column "a" should only have offset index
4492        assert!(row_group.column(0).offset_index_offset().is_some());
4493        assert!(row_group.column(0).column_index_offset().is_none());
4494        // Column "b" should only have offset index
4495        assert!(row_group.column(1).offset_index_offset().is_some());
4496        assert!(row_group.column(1).column_index_offset().is_none());
4497
4498        let options = ReadOptionsBuilder::new().with_page_index().build();
4499        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4500
4501        let row_group = reader.get_row_group(0).unwrap();
4502        let a_col = row_group.metadata().column(0);
4503        let b_col = row_group.metadata().column(1);
4504
4505        // Column chunk of column "a" should have chunk level statistics
4506        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4507            let min = byte_array_stats.min_opt().unwrap();
4508            let max = byte_array_stats.max_opt().unwrap();
4509
4510            assert_eq!(min.as_bytes(), b"a");
4511            assert_eq!(max.as_bytes(), b"d");
4512        } else {
4513            panic!("expecting Statistics::ByteArray");
4514        }
4515
4516        // The column chunk for column "b"  shouldn't have statistics
4517        assert!(b_col.statistics().is_none());
4518
4519        let column_index = reader.metadata().column_index().unwrap();
4520        assert_eq!(column_index.len(), 1); // 1 row group
4521        assert_eq!(column_index[0].len(), 2); // 2 columns
4522
4523        let a_idx = &column_index[0][0];
4524        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4525        let b_idx = &column_index[0][1];
4526        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4527    }
4528
4529    #[test]
4530    fn test_arrow_writer_skip_metadata() {
4531        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4532        let file_schema = Arc::new(batch_schema.clone());
4533
4534        let batch = RecordBatch::try_new(
4535            Arc::new(batch_schema),
4536            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4537        )
4538        .unwrap();
4539        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4540
4541        let mut buf = Vec::with_capacity(1024);
4542        let mut writer =
4543            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4544        writer.write(&batch).unwrap();
4545        writer.close().unwrap();
4546
4547        let bytes = Bytes::from(buf);
4548        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4549        assert_eq!(file_schema, *reader_builder.schema());
4550        if let Some(key_value_metadata) = reader_builder
4551            .metadata()
4552            .file_metadata()
4553            .key_value_metadata()
4554        {
4555            assert!(
4556                !key_value_metadata
4557                    .iter()
4558                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4559            );
4560        }
4561    }
4562
4563    #[test]
4564    fn mismatched_schemas() {
4565        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4566        let file_schema = Arc::new(Schema::new(vec![Field::new(
4567            "temperature",
4568            DataType::Float64,
4569            false,
4570        )]));
4571
4572        let batch = RecordBatch::try_new(
4573            Arc::new(batch_schema),
4574            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4575        )
4576        .unwrap();
4577
4578        let mut buf = Vec::with_capacity(1024);
4579        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4580
4581        let err = writer.write(&batch).unwrap_err().to_string();
4582        assert_eq!(
4583            err,
4584            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4585        );
4586    }
4587
4588    #[test]
4589    // https://github.com/apache/arrow-rs/issues/6988
4590    fn test_roundtrip_empty_schema() {
4591        // create empty record batch with empty schema
4592        let empty_batch = RecordBatch::try_new_with_options(
4593            Arc::new(Schema::empty()),
4594            vec![],
4595            &RecordBatchOptions::default().with_row_count(Some(0)),
4596        )
4597        .unwrap();
4598
4599        // write to parquet
4600        let mut parquet_bytes: Vec<u8> = Vec::new();
4601        let mut writer =
4602            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4603        writer.write(&empty_batch).unwrap();
4604        writer.close().unwrap();
4605
4606        // read from parquet
4607        let bytes = Bytes::from(parquet_bytes);
4608        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4609        assert_eq!(reader.schema(), &empty_batch.schema());
4610        let batches: Vec<_> = reader
4611            .build()
4612            .unwrap()
4613            .collect::<ArrowResult<Vec<_>>>()
4614            .unwrap();
4615        assert_eq!(batches.len(), 0);
4616    }
4617
4618    #[test]
4619    fn test_page_stats_not_written_by_default() {
4620        let string_field = Field::new("a", DataType::Utf8, false);
4621        let schema = Schema::new(vec![string_field]);
4622        let raw_string_values = vec!["Blart Versenwald III"];
4623        let string_values = StringArray::from(raw_string_values.clone());
4624        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4625
4626        let props = WriterProperties::builder()
4627            .set_statistics_enabled(EnabledStatistics::Page)
4628            .set_dictionary_enabled(false)
4629            .set_encoding(Encoding::PLAIN)
4630            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4631            .build();
4632
4633        let file = roundtrip_opts(&batch, props);
4634
4635        // read file and decode page headers
4636        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4637
4638        // decode first page header
4639        let first_page = &file[4..];
4640        let mut prot = ThriftSliceInputProtocol::new(first_page);
4641        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4642        let stats = hdr.data_page_header.unwrap().statistics;
4643
4644        assert!(stats.is_none());
4645    }
4646
4647    #[test]
4648    fn test_page_stats_when_enabled() {
4649        let string_field = Field::new("a", DataType::Utf8, false);
4650        let schema = Schema::new(vec![string_field]);
4651        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4652        let string_values = StringArray::from(raw_string_values.clone());
4653        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4654
4655        let props = WriterProperties::builder()
4656            .set_statistics_enabled(EnabledStatistics::Page)
4657            .set_dictionary_enabled(false)
4658            .set_encoding(Encoding::PLAIN)
4659            .set_write_page_header_statistics(true)
4660            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4661            .build();
4662
4663        let file = roundtrip_opts(&batch, props);
4664
4665        // read file and decode page headers
4666        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4667
4668        // decode first page header
4669        let first_page = &file[4..];
4670        let mut prot = ThriftSliceInputProtocol::new(first_page);
4671        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4672        let stats = hdr.data_page_header.unwrap().statistics;
4673
4674        let stats = stats.unwrap();
4675        // check that min/max were actually written to the page
4676        assert!(stats.is_max_value_exact.unwrap());
4677        assert!(stats.is_min_value_exact.unwrap());
4678        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4679        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4680    }
4681
4682    #[test]
4683    fn test_page_stats_truncation() {
4684        let string_field = Field::new("a", DataType::Utf8, false);
4685        let binary_field = Field::new("b", DataType::Binary, false);
4686        let schema = Schema::new(vec![string_field, binary_field]);
4687
4688        let raw_string_values = vec!["Blart Versenwald III"];
4689        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4690        let raw_binary_value_refs = raw_binary_values
4691            .iter()
4692            .map(|x| x.as_slice())
4693            .collect::<Vec<_>>();
4694
4695        let string_values = StringArray::from(raw_string_values.clone());
4696        let binary_values = BinaryArray::from(raw_binary_value_refs);
4697        let batch = RecordBatch::try_new(
4698            Arc::new(schema),
4699            vec![Arc::new(string_values), Arc::new(binary_values)],
4700        )
4701        .unwrap();
4702
4703        let props = WriterProperties::builder()
4704            .set_statistics_truncate_length(Some(2))
4705            .set_dictionary_enabled(false)
4706            .set_encoding(Encoding::PLAIN)
4707            .set_write_page_header_statistics(true)
4708            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4709            .build();
4710
4711        let file = roundtrip_opts(&batch, props);
4712
4713        // read file and decode page headers
4714        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4715
4716        // decode first page header
4717        let first_page = &file[4..];
4718        let mut prot = ThriftSliceInputProtocol::new(first_page);
4719        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4720        let stats = hdr.data_page_header.unwrap().statistics;
4721        assert!(stats.is_some());
4722        let stats = stats.unwrap();
4723        // check that min/max were properly truncated
4724        assert!(!stats.is_max_value_exact.unwrap());
4725        assert!(!stats.is_min_value_exact.unwrap());
4726        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4727        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4728
4729        // check second page now
4730        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4731        let mut prot = ThriftSliceInputProtocol::new(second_page);
4732        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4733        let stats = hdr.data_page_header.unwrap().statistics;
4734        assert!(stats.is_some());
4735        let stats = stats.unwrap();
4736        // check that min/max were properly truncated
4737        assert!(!stats.is_max_value_exact.unwrap());
4738        assert!(!stats.is_min_value_exact.unwrap());
4739        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4740        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4741    }
4742
4743    #[test]
4744    fn test_page_encoding_statistics_roundtrip() {
4745        let batch_schema = Schema::new(vec![Field::new(
4746            "int32",
4747            arrow_schema::DataType::Int32,
4748            false,
4749        )]);
4750
4751        let batch = RecordBatch::try_new(
4752            Arc::new(batch_schema.clone()),
4753            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4754        )
4755        .unwrap();
4756
4757        let mut file: File = tempfile::tempfile().unwrap();
4758        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4759        writer.write(&batch).unwrap();
4760        let file_metadata = writer.close().unwrap();
4761
4762        assert_eq!(file_metadata.num_row_groups(), 1);
4763        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4764        assert!(
4765            file_metadata
4766                .row_group(0)
4767                .column(0)
4768                .page_encoding_stats()
4769                .is_some()
4770        );
4771        let chunk_page_stats = file_metadata
4772            .row_group(0)
4773            .column(0)
4774            .page_encoding_stats()
4775            .unwrap();
4776
4777        // check that the read metadata is also correct
4778        let options = ReadOptionsBuilder::new()
4779            .with_page_index()
4780            .with_encoding_stats_as_mask(false)
4781            .build();
4782        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4783
4784        let rowgroup = reader.get_row_group(0).expect("row group missing");
4785        assert_eq!(rowgroup.num_columns(), 1);
4786        let column = rowgroup.metadata().column(0);
4787        assert!(column.page_encoding_stats().is_some());
4788        let file_page_stats = column.page_encoding_stats().unwrap();
4789        assert_eq!(chunk_page_stats, file_page_stats);
4790    }
4791
4792    #[test]
4793    fn test_different_dict_page_size_limit() {
4794        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4795        let schema = Arc::new(Schema::new(vec![
4796            Field::new("col0", arrow_schema::DataType::Int64, false),
4797            Field::new("col1", arrow_schema::DataType::Int64, false),
4798        ]));
4799        let batch =
4800            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4801
4802        let props = WriterProperties::builder()
4803            .set_dictionary_page_size_limit(1024 * 1024)
4804            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4805            .build();
4806        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4807        writer.write(&batch).unwrap();
4808        let data = Bytes::from(writer.into_inner().unwrap());
4809
4810        let mut metadata = ParquetMetaDataReader::new();
4811        metadata.try_parse(&data).unwrap();
4812        let metadata = metadata.finish().unwrap();
4813        let col0_meta = metadata.row_group(0).column(0);
4814        let col1_meta = metadata.row_group(0).column(1);
4815
4816        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4817            let mut reader =
4818                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4819            let page = reader.get_next_page().unwrap().unwrap();
4820            match page {
4821                Page::DictionaryPage { buf, .. } => buf.len(),
4822                _ => panic!("expected DictionaryPage"),
4823            }
4824        };
4825
4826        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4827        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4828    }
4829
4830    struct WriteBatchesShape {
4831        num_batches: usize,
4832        rows_per_batch: usize,
4833        row_size: usize,
4834    }
4835
4836    /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter`
4837    fn write_batches(
4838        WriteBatchesShape {
4839            num_batches,
4840            rows_per_batch,
4841            row_size,
4842        }: WriteBatchesShape,
4843        props: WriterProperties,
4844    ) -> ParquetRecordBatchReaderBuilder<File> {
4845        let schema = Arc::new(Schema::new(vec![Field::new(
4846            "str",
4847            ArrowDataType::Utf8,
4848            false,
4849        )]));
4850        let file = tempfile::tempfile().unwrap();
4851        let mut writer =
4852            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4853
4854        for batch_idx in 0..num_batches {
4855            let strings: Vec<String> = (0..rows_per_batch)
4856                .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
4857                .collect();
4858            let array = StringArray::from(strings);
4859            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4860            writer.write(&batch).unwrap();
4861        }
4862        writer.close().unwrap();
4863        ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
4864    }
4865
4866    #[test]
4867    // When both limits are None, all data should go into a single row group
4868    fn test_row_group_limit_none_writes_single_row_group() {
4869        let props = WriterProperties::builder()
4870            .set_max_row_group_row_count(None)
4871            .set_max_row_group_bytes(None)
4872            .build();
4873
4874        let builder = write_batches(
4875            WriteBatchesShape {
4876                num_batches: 1,
4877                rows_per_batch: 1000,
4878                row_size: 4,
4879            },
4880            props,
4881        );
4882
4883        assert_eq!(
4884            &row_group_sizes(builder.metadata()),
4885            &[1000],
4886            "With no limits, all rows should be in a single row group"
4887        );
4888    }
4889
4890    #[test]
4891    // When only max_row_group_size is set, respect the row limit
4892    fn test_row_group_limit_rows_only() {
4893        let props = WriterProperties::builder()
4894            .set_max_row_group_row_count(Some(300))
4895            .set_max_row_group_bytes(None)
4896            .build();
4897
4898        let builder = write_batches(
4899            WriteBatchesShape {
4900                num_batches: 1,
4901                rows_per_batch: 1000,
4902                row_size: 4,
4903            },
4904            props,
4905        );
4906
4907        assert_eq!(
4908            &row_group_sizes(builder.metadata()),
4909            &[300, 300, 300, 100],
4910            "Row groups should be split by row count"
4911        );
4912    }
4913
4914    #[test]
4915    // When only max_row_group_bytes is set, respect the byte limit
4916    fn test_row_group_limit_bytes_only() {
4917        let props = WriterProperties::builder()
4918            .set_max_row_group_row_count(None)
4919            // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
4920            .set_max_row_group_bytes(Some(3500))
4921            .build();
4922
4923        let builder = write_batches(
4924            WriteBatchesShape {
4925                num_batches: 10,
4926                rows_per_batch: 10,
4927                row_size: 100,
4928            },
4929            props,
4930        );
4931
4932        let sizes = row_group_sizes(builder.metadata());
4933
4934        assert!(
4935            sizes.len() > 1,
4936            "Should have multiple row groups due to byte limit, got {sizes:?}",
4937        );
4938
4939        let total_rows: i64 = sizes.iter().sum();
4940        assert_eq!(total_rows, 100, "Total rows should be preserved");
4941    }
4942
4943    #[test]
4944    // If an in-progress row group is already oversized, it should be flushed before writing more.
4945    fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
4946        let schema = Arc::new(Schema::new(vec![Field::new(
4947            "str",
4948            ArrowDataType::Utf8,
4949            false,
4950        )]));
4951        let file = tempfile::tempfile().unwrap();
4952
4953        // Start with no byte limit so we can intentionally build an oversized in-progress row group.
4954        let props = WriterProperties::builder()
4955            .set_max_row_group_row_count(None)
4956            .set_max_row_group_bytes(None)
4957            .build();
4958        let mut writer =
4959            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4960
4961        let first_array = StringArray::from(
4962            (0..10)
4963                .map(|i| format!("{:0>100}", i))
4964                .collect::<Vec<String>>(),
4965        );
4966        let first_batch =
4967            RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
4968        writer.write(&first_batch).unwrap();
4969        assert_eq!(writer.in_progress_rows(), 10);
4970
4971        // Tighten the limit below the current in-progress bytes to exercise:
4972        // `if current_bytes >= max_bytes { self.flush()?; ... }`
4973        writer.max_row_group_bytes = Some(1);
4974
4975        let second_array = StringArray::from(vec!["x".to_string()]);
4976        let second_batch =
4977            RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
4978        writer.write(&second_batch).unwrap();
4979        writer.close().unwrap();
4980        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4981
4982        assert_eq!(
4983            &row_group_sizes(builder.metadata()),
4984            &[10, 1],
4985            "The second write should flush an oversized in-progress row group first",
4986        );
4987    }
4988
4989    #[test]
4990    // When both limits are set, the row limit triggers first
4991    fn test_row_group_limit_both_row_wins_single_batch() {
4992        let props = WriterProperties::builder()
4993            .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows
4994            .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data
4995            .build();
4996
4997        let builder = write_batches(
4998            WriteBatchesShape {
4999                num_batches: 1,
5000                row_size: 4,
5001                rows_per_batch: 1000,
5002            },
5003            props,
5004        );
5005
5006        assert_eq!(
5007            &row_group_sizes(builder.metadata()),
5008            &[200, 200, 200, 200, 200],
5009            "Row limit should trigger before byte limit"
5010        );
5011    }
5012
5013    #[test]
5014    // When both limits are set, the row limit triggers first
5015    fn test_row_group_limit_both_row_wins_multiple_batches() {
5016        let props = WriterProperties::builder()
5017            .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows
5018            .set_max_row_group_bytes(Some(9999)) // Won't trigger
5019            .build();
5020
5021        let builder = write_batches(
5022            WriteBatchesShape {
5023                num_batches: 10,
5024                rows_per_batch: 10,
5025                row_size: 100,
5026            },
5027            props,
5028        );
5029
5030        assert_eq!(
5031            &row_group_sizes(builder.metadata()),
5032            &[5; 20],
5033            "Row limit should trigger before byte limit"
5034        );
5035    }
5036
5037    #[test]
5038    // When both limits are set, the byte limit triggers first
5039    fn test_row_group_limit_both_bytes_wins() {
5040        let props = WriterProperties::builder()
5041            .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows
5042            .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows
5043            .build();
5044
5045        let builder = write_batches(
5046            WriteBatchesShape {
5047                num_batches: 10,
5048                rows_per_batch: 10,
5049                row_size: 100,
5050            },
5051            props,
5052        );
5053
5054        let sizes = row_group_sizes(builder.metadata());
5055
5056        assert!(
5057            sizes.len() > 1,
5058            "Byte limit should trigger before row limit, got {sizes:?}",
5059        );
5060
5061        assert!(
5062            sizes.iter().all(|&s| s < 1000),
5063            "No row group should hit the row limit"
5064        );
5065
5066        let total_rows: i64 = sizes.iter().sum();
5067        assert_eq!(total_rows, 100, "Total rows should be preserved");
5068    }
5069}