parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58/// Encodes [`RecordBatch`] to parquet
59///
60/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
61/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
62/// flushed on close, leading the final row group in the output file to potentially
63/// contain fewer than `max_row_group_size` rows
64///
65/// # Example: Writing `RecordBatch`es
66/// ```
67/// # use std::sync::Arc;
68/// # use bytes::Bytes;
69/// # use arrow_array::{ArrayRef, Int64Array};
70/// # use arrow_array::RecordBatch;
71/// # use parquet::arrow::arrow_writer::ArrowWriter;
72/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
73/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
74/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
75///
76/// let mut buffer = Vec::new();
77/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
78/// writer.write(&to_write).unwrap();
79/// writer.close().unwrap();
80///
81/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
82/// let read = reader.next().unwrap().unwrap();
83///
84/// assert_eq!(to_write, read);
85/// ```
86///
87/// # Memory Usage and Limiting
88///
89/// The nature of Parquet requires buffering of an entire row group before it can
90/// be flushed to the underlying writer. Data is mostly buffered in its encoded
91/// form, reducing memory usage. However, some data such as dictionary keys,
92/// large strings or very nested data may still result in non-trivial memory
93/// usage.
94///
95/// See Also:
96/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
97/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
98///
99/// Call [`Self::flush`] to trigger an early flush of a row group based on a
100/// memory threshold and/or global memory pressure. However,  smaller row groups
101/// result in higher metadata overheads, and thus may worsen compression ratios
102/// and query performance.
103///
104/// ```no_run
105/// # use std::io::Write;
106/// # use arrow_array::RecordBatch;
107/// # use parquet::arrow::ArrowWriter;
108/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
109/// # let batch: RecordBatch = todo!();
110/// writer.write(&batch).unwrap();
111/// // Trigger an early flush if anticipated size exceeds 1_000_000
112/// if writer.in_progress_size() > 1_000_000 {
113///     writer.flush().unwrap();
114/// }
115/// ```
116///
117/// ## Type Support
118///
119/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
120/// Parquet types including  [`StructArray`] and [`ListArray`].
121///
122/// The following are not supported:
123///
124/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
125///
126/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
127/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
128/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
129/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
130/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
131pub struct ArrowWriter<W: Write> {
132    /// Underlying Parquet writer
133    writer: SerializedFileWriter<W>,
134
135    /// The in-progress row group if any
136    in_progress: Option<ArrowRowGroupWriter>,
137
138    /// A copy of the Arrow schema.
139    ///
140    /// The schema is used to verify that each record batch written has the correct schema
141    arrow_schema: SchemaRef,
142
143    /// Creates new [`ArrowRowGroupWriter`] instances as required
144    row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146    /// The length of arrays to write to each row group
147    max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        let buffered_memory = self.in_progress_size();
153        f.debug_struct("ArrowWriter")
154            .field("writer", &self.writer)
155            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156            .field("in_progress_rows", &self.in_progress_rows())
157            .field("arrow_schema", &self.arrow_schema)
158            .field("max_row_group_size", &self.max_row_group_size)
159            .finish()
160    }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164    /// Try to create a new Arrow writer
165    ///
166    /// The writer will fail if:
167    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
168    ///  * the Arrow schema contains unsupported datatypes such as Unions
169    pub fn try_new(
170        writer: W,
171        arrow_schema: SchemaRef,
172        props: Option<WriterProperties>,
173    ) -> Result<Self> {
174        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175        Self::try_new_with_options(writer, arrow_schema, options)
176    }
177
178    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
179    ///
180    /// The writer will fail if:
181    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
182    ///  * the Arrow schema contains unsupported datatypes such as Unions
183    pub fn try_new_with_options(
184        writer: W,
185        arrow_schema: SchemaRef,
186        options: ArrowWriterOptions,
187    ) -> Result<Self> {
188        let mut props = options.properties;
189        let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190        if let Some(schema_root) = &options.schema_root {
191            converter = converter.schema_root(schema_root);
192        }
193        let schema = converter.convert(&arrow_schema)?;
194        if !options.skip_arrow_metadata {
195            // add serialized arrow schema
196            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197        }
198
199        let max_row_group_size = props.max_row_group_size();
200
201        let file_writer =
202            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204        let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206        Ok(Self {
207            writer: file_writer,
208            in_progress: None,
209            arrow_schema,
210            row_group_writer_factory,
211            max_row_group_size,
212        })
213    }
214
215    /// Returns metadata for any flushed row groups
216    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217        self.writer.flushed_row_groups()
218    }
219
220    /// Estimated memory usage, in bytes, of this `ArrowWriter`
221    ///
222    /// This estimate is formed bu summing the values of
223    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
224    pub fn memory_size(&self) -> usize {
225        match &self.in_progress {
226            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227            None => 0,
228        }
229    }
230
231    /// Anticipated encoded size of the in progress row group.
232    ///
233    /// This estimate the row group size after being completely encoded is,
234    /// formed by summing the values of
235    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
236    /// columns.
237    pub fn in_progress_size(&self) -> usize {
238        match &self.in_progress {
239            Some(in_progress) => in_progress
240                .writers
241                .iter()
242                .map(|x| x.get_estimated_total_bytes())
243                .sum(),
244            None => 0,
245        }
246    }
247
248    /// Returns the number of rows buffered in the in progress row group
249    pub fn in_progress_rows(&self) -> usize {
250        self.in_progress
251            .as_ref()
252            .map(|x| x.buffered_rows)
253            .unwrap_or_default()
254    }
255
256    /// Returns the number of bytes written by this instance
257    pub fn bytes_written(&self) -> usize {
258        self.writer.bytes_written()
259    }
260
261    /// Encodes the provided [`RecordBatch`]
262    ///
263    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
264    /// rows, the contents of `batch` will be written to one or more row groups such that all but
265    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
266    ///
267    /// This will fail if the `batch`'s schema does not match the writer's schema.
268    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269        if batch.num_rows() == 0 {
270            return Ok(());
271        }
272
273        let in_progress = match &mut self.in_progress {
274            Some(in_progress) => in_progress,
275            x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276                self.writer.schema_descr(),
277                self.writer.properties(),
278                &self.arrow_schema,
279                self.writer.flushed_row_groups().len(),
280            )?),
281        };
282
283        // If would exceed max_row_group_size, split batch
284        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285            let to_write = self.max_row_group_size - in_progress.buffered_rows;
286            let a = batch.slice(0, to_write);
287            let b = batch.slice(to_write, batch.num_rows() - to_write);
288            self.write(&a)?;
289            return self.write(&b);
290        }
291
292        in_progress.write(batch)?;
293
294        if in_progress.buffered_rows >= self.max_row_group_size {
295            self.flush()?
296        }
297        Ok(())
298    }
299
300    /// Flushes all buffered rows into a new row group
301    pub fn flush(&mut self) -> Result<()> {
302        let in_progress = match self.in_progress.take() {
303            Some(in_progress) => in_progress,
304            None => return Ok(()),
305        };
306
307        let mut row_group_writer = self.writer.next_row_group()?;
308        for chunk in in_progress.close()? {
309            chunk.append_to_row_group(&mut row_group_writer)?;
310        }
311        row_group_writer.close()?;
312        Ok(())
313    }
314
315    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
316    ///
317    /// This method provide a way to append kv_metadata after write RecordBatch
318    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
319        self.writer.append_key_value_metadata(kv_metadata)
320    }
321
322    /// Returns a reference to the underlying writer.
323    pub fn inner(&self) -> &W {
324        self.writer.inner()
325    }
326
327    /// Returns a mutable reference to the underlying writer.
328    ///
329    /// It is inadvisable to directly write to the underlying writer, doing so
330    /// will likely result in a corrupt parquet file
331    pub fn inner_mut(&mut self) -> &mut W {
332        self.writer.inner_mut()
333    }
334
335    /// Flushes any outstanding data and returns the underlying writer.
336    pub fn into_inner(mut self) -> Result<W> {
337        self.flush()?;
338        self.writer.into_inner()
339    }
340
341    /// Close and finalize the underlying Parquet writer
342    ///
343    /// Unlike [`Self::close`] this does not consume self
344    ///
345    /// Attempting to write after calling finish will result in an error
346    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
347        self.flush()?;
348        self.writer.finish()
349    }
350
351    /// Close and finalize the underlying Parquet writer
352    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
353        self.finish()
354    }
355}
356
357impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
358    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
359        self.write(batch).map_err(|e| e.into())
360    }
361
362    fn close(self) -> std::result::Result<(), ArrowError> {
363        self.close()?;
364        Ok(())
365    }
366}
367
368/// Arrow-specific configuration settings for writing parquet files.
369///
370/// See [`ArrowWriter`] for how to configure the writer.
371#[derive(Debug, Clone, Default)]
372pub struct ArrowWriterOptions {
373    properties: WriterProperties,
374    skip_arrow_metadata: bool,
375    schema_root: Option<String>,
376}
377
378impl ArrowWriterOptions {
379    /// Creates a new [`ArrowWriterOptions`] with the default settings.
380    pub fn new() -> Self {
381        Self::default()
382    }
383
384    /// Sets the [`WriterProperties`] for writing parquet files.
385    pub fn with_properties(self, properties: WriterProperties) -> Self {
386        Self { properties, ..self }
387    }
388
389    /// Skip encoding the embedded arrow metadata (defaults to `false`)
390    ///
391    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
392    /// by default.
393    ///
394    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
395    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
396        Self {
397            skip_arrow_metadata,
398            ..self
399        }
400    }
401
402    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
403    pub fn with_schema_root(self, schema_root: String) -> Self {
404        Self {
405            schema_root: Some(schema_root),
406            ..self
407        }
408    }
409}
410
411/// A single column chunk produced by [`ArrowColumnWriter`]
412#[derive(Default)]
413struct ArrowColumnChunkData {
414    length: usize,
415    data: Vec<Bytes>,
416}
417
418impl Length for ArrowColumnChunkData {
419    fn len(&self) -> u64 {
420        self.length as _
421    }
422}
423
424impl ChunkReader for ArrowColumnChunkData {
425    type T = ArrowColumnChunkReader;
426
427    fn get_read(&self, start: u64) -> Result<Self::T> {
428        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
429        Ok(ArrowColumnChunkReader(
430            self.data.clone().into_iter().peekable(),
431        ))
432    }
433
434    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
435        unimplemented!()
436    }
437}
438
439/// A [`Read`] for [`ArrowColumnChunkData`]
440struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
441
442impl Read for ArrowColumnChunkReader {
443    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
444        let buffer = loop {
445            match self.0.peek_mut() {
446                Some(b) if b.is_empty() => {
447                    self.0.next();
448                    continue;
449                }
450                Some(b) => break b,
451                None => return Ok(0),
452            }
453        };
454
455        let len = buffer.len().min(out.len());
456        let b = buffer.split_to(len);
457        out[..len].copy_from_slice(&b);
458        Ok(len)
459    }
460}
461
462/// A shared [`ArrowColumnChunkData`]
463///
464/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
465/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
466type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
467
468#[derive(Default)]
469struct ArrowPageWriter {
470    buffer: SharedColumnChunk,
471    #[cfg(feature = "encryption")]
472    page_encryptor: Option<PageEncryptor>,
473}
474
475impl ArrowPageWriter {
476    #[cfg(feature = "encryption")]
477    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
478        self.page_encryptor = page_encryptor;
479        self
480    }
481
482    #[cfg(feature = "encryption")]
483    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
484        self.page_encryptor.as_mut()
485    }
486
487    #[cfg(not(feature = "encryption"))]
488    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
489        None
490    }
491}
492
493impl PageWriter for ArrowPageWriter {
494    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
495        let page = match self.page_encryptor_mut() {
496            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
497            None => page,
498        };
499
500        let page_header = page.to_thrift_header();
501        let header = {
502            let mut header = Vec::with_capacity(1024);
503
504            match self.page_encryptor_mut() {
505                Some(page_encryptor) => {
506                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
507                    if page.compressed_page().is_data_page() {
508                        page_encryptor.increment_page();
509                    }
510                }
511                None => {
512                    let mut protocol = TCompactOutputProtocol::new(&mut header);
513                    page_header.write_to_out_protocol(&mut protocol)?;
514                }
515            };
516
517            Bytes::from(header)
518        };
519
520        let mut buf = self.buffer.try_lock().unwrap();
521
522        let data = page.compressed_page().buffer().clone();
523        let compressed_size = data.len() + header.len();
524
525        let mut spec = PageWriteSpec::new();
526        spec.page_type = page.page_type();
527        spec.num_values = page.num_values();
528        spec.uncompressed_size = page.uncompressed_size() + header.len();
529        spec.offset = buf.length as u64;
530        spec.compressed_size = compressed_size;
531        spec.bytes_written = compressed_size as u64;
532
533        buf.length += compressed_size;
534        buf.data.push(header);
535        buf.data.push(data);
536
537        Ok(spec)
538    }
539
540    fn close(&mut self) -> Result<()> {
541        Ok(())
542    }
543}
544
545/// A leaf column that can be encoded by [`ArrowColumnWriter`]
546#[derive(Debug)]
547pub struct ArrowLeafColumn(ArrayLevels);
548
549/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
550pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
551    let levels = calculate_array_levels(array, field)?;
552    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
553}
554
555/// The data for a single column chunk, see [`ArrowColumnWriter`]
556pub struct ArrowColumnChunk {
557    data: ArrowColumnChunkData,
558    close: ColumnCloseResult,
559}
560
561impl std::fmt::Debug for ArrowColumnChunk {
562    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563        f.debug_struct("ArrowColumnChunk")
564            .field("length", &self.data.length)
565            .finish_non_exhaustive()
566    }
567}
568
569impl ArrowColumnChunk {
570    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
571    pub fn append_to_row_group<W: Write + Send>(
572        self,
573        writer: &mut SerializedRowGroupWriter<'_, W>,
574    ) -> Result<()> {
575        writer.append_column(&self.data, self.close)
576    }
577}
578
579/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
580///
581/// Note: This is a low-level interface for applications that require
582/// fine-grained control of encoding (e.g. encoding using multiple threads),
583/// see [`ArrowWriter`] for a higher-level interface
584///
585/// # Example: Encoding two Arrow Array's in Parallel
586/// ```
587/// // The arrow schema
588/// # use std::sync::Arc;
589/// # use arrow_array::*;
590/// # use arrow_schema::*;
591/// # use parquet::arrow::ArrowSchemaConverter;
592/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
593/// # use parquet::file::properties::WriterProperties;
594/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
595/// #
596/// let schema = Arc::new(Schema::new(vec![
597///     Field::new("i32", DataType::Int32, false),
598///     Field::new("f32", DataType::Float32, false),
599/// ]));
600///
601/// // Compute the parquet schema
602/// let props = Arc::new(WriterProperties::default());
603/// let parquet_schema = ArrowSchemaConverter::new()
604///   .with_coerce_types(props.coerce_types())
605///   .convert(&schema)
606///   .unwrap();
607///
608/// // Create writers for each of the leaf columns
609/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
610///
611/// // Spawn a worker thread for each column
612/// //
613/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
614/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
615/// let mut workers: Vec<_> = col_writers
616///     .into_iter()
617///     .map(|mut col_writer| {
618///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
619///         let handle = std::thread::spawn(move || {
620///             // receive Arrays to encode via the channel
621///             for col in recv {
622///                 col_writer.write(&col)?;
623///             }
624///             // once the input is complete, close the writer
625///             // to return the newly created ArrowColumnChunk
626///             col_writer.close()
627///         });
628///         (handle, send)
629///     })
630///     .collect();
631///
632/// // Create parquet writer
633/// let root_schema = parquet_schema.root_schema_ptr();
634/// // write to memory in the example, but this could be a File
635/// let mut out = Vec::with_capacity(1024);
636/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
637///   .unwrap();
638///
639/// // Start row group
640/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
641///   .next_row_group()
642///   .unwrap();
643///
644/// // Create some example input columns to encode
645/// let to_write = vec![
646///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
647///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
648/// ];
649///
650/// // Send the input columns to the workers
651/// let mut worker_iter = workers.iter_mut();
652/// for (arr, field) in to_write.iter().zip(&schema.fields) {
653///     for leaves in compute_leaves(field, arr).unwrap() {
654///         worker_iter.next().unwrap().1.send(leaves).unwrap();
655///     }
656/// }
657///
658/// // Wait for the workers to complete encoding, and append
659/// // the resulting column chunks to the row group (and the file)
660/// for (handle, send) in workers {
661///     drop(send); // Drop send side to signal termination
662///     // wait for the worker to send the completed chunk
663///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
664///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
665/// }
666/// // Close the row group which writes to the underlying file
667/// row_group_writer.close().unwrap();
668///
669/// let metadata = writer.close().unwrap();
670/// assert_eq!(metadata.num_rows, 3);
671/// ```
672pub struct ArrowColumnWriter {
673    writer: ArrowColumnWriterImpl,
674    chunk: SharedColumnChunk,
675}
676
677impl std::fmt::Debug for ArrowColumnWriter {
678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
680    }
681}
682
683enum ArrowColumnWriterImpl {
684    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
685    Column(ColumnWriter<'static>),
686}
687
688impl ArrowColumnWriter {
689    /// Write an [`ArrowLeafColumn`]
690    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
691        match &mut self.writer {
692            ArrowColumnWriterImpl::Column(c) => {
693                write_leaf(c, &col.0)?;
694            }
695            ArrowColumnWriterImpl::ByteArray(c) => {
696                write_primitive(c, col.0.array().as_ref(), &col.0)?;
697            }
698        }
699        Ok(())
700    }
701
702    /// Close this column returning the written [`ArrowColumnChunk`]
703    pub fn close(self) -> Result<ArrowColumnChunk> {
704        let close = match self.writer {
705            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
706            ArrowColumnWriterImpl::Column(c) => c.close()?,
707        };
708        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
709        let data = chunk.into_inner().unwrap();
710        Ok(ArrowColumnChunk { data, close })
711    }
712
713    /// Returns the estimated total memory usage by the writer.
714    ///
715    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
716    /// of the current memory usage and not it's anticipated encoded size.
717    ///
718    /// This includes:
719    /// 1. Data buffered in encoded form
720    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
721    ///
722    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
723    pub fn memory_size(&self) -> usize {
724        match &self.writer {
725            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
726            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
727        }
728    }
729
730    /// Returns the estimated total encoded bytes for this column writer.
731    ///
732    /// This includes:
733    /// 1. Data buffered in encoded form
734    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
735    ///
736    /// This value should be less than or equal to [`Self::memory_size`]
737    pub fn get_estimated_total_bytes(&self) -> usize {
738        match &self.writer {
739            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
740            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
741        }
742    }
743}
744
745/// Encodes [`RecordBatch`] to a parquet row group
746struct ArrowRowGroupWriter {
747    writers: Vec<ArrowColumnWriter>,
748    schema: SchemaRef,
749    buffered_rows: usize,
750}
751
752impl ArrowRowGroupWriter {
753    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
754        Self {
755            writers,
756            schema: arrow.clone(),
757            buffered_rows: 0,
758        }
759    }
760
761    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
762        self.buffered_rows += batch.num_rows();
763        let mut writers = self.writers.iter_mut();
764        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
765            for leaf in compute_leaves(field.as_ref(), column)? {
766                writers.next().unwrap().write(&leaf)?
767            }
768        }
769        Ok(())
770    }
771
772    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
773        self.writers
774            .into_iter()
775            .map(|writer| writer.close())
776            .collect()
777    }
778}
779
780struct ArrowRowGroupWriterFactory {
781    #[cfg(feature = "encryption")]
782    file_encryptor: Option<Arc<FileEncryptor>>,
783}
784
785impl ArrowRowGroupWriterFactory {
786    #[cfg(feature = "encryption")]
787    fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
788        Self {
789            file_encryptor: file_writer.file_encryptor(),
790        }
791    }
792
793    #[cfg(not(feature = "encryption"))]
794    fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
795        Self {}
796    }
797
798    #[cfg(feature = "encryption")]
799    fn create_row_group_writer(
800        &self,
801        parquet: &SchemaDescriptor,
802        props: &WriterPropertiesPtr,
803        arrow: &SchemaRef,
804        row_group_index: usize,
805    ) -> Result<ArrowRowGroupWriter> {
806        let writers = get_column_writers_with_encryptor(
807            parquet,
808            props,
809            arrow,
810            self.file_encryptor.clone(),
811            row_group_index,
812        )?;
813        Ok(ArrowRowGroupWriter::new(writers, arrow))
814    }
815
816    #[cfg(not(feature = "encryption"))]
817    fn create_row_group_writer(
818        &self,
819        parquet: &SchemaDescriptor,
820        props: &WriterPropertiesPtr,
821        arrow: &SchemaRef,
822        _row_group_index: usize,
823    ) -> Result<ArrowRowGroupWriter> {
824        let writers = get_column_writers(parquet, props, arrow)?;
825        Ok(ArrowRowGroupWriter::new(writers, arrow))
826    }
827}
828
829/// Returns the [`ArrowColumnWriter`] for a given schema
830pub fn get_column_writers(
831    parquet: &SchemaDescriptor,
832    props: &WriterPropertiesPtr,
833    arrow: &SchemaRef,
834) -> Result<Vec<ArrowColumnWriter>> {
835    let mut writers = Vec::with_capacity(arrow.fields.len());
836    let mut leaves = parquet.columns().iter();
837    let column_factory = ArrowColumnWriterFactory::new();
838    for field in &arrow.fields {
839        column_factory.get_arrow_column_writer(
840            field.data_type(),
841            props,
842            &mut leaves,
843            &mut writers,
844        )?;
845    }
846    Ok(writers)
847}
848
849/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
850#[cfg(feature = "encryption")]
851fn get_column_writers_with_encryptor(
852    parquet: &SchemaDescriptor,
853    props: &WriterPropertiesPtr,
854    arrow: &SchemaRef,
855    file_encryptor: Option<Arc<FileEncryptor>>,
856    row_group_index: usize,
857) -> Result<Vec<ArrowColumnWriter>> {
858    let mut writers = Vec::with_capacity(arrow.fields.len());
859    let mut leaves = parquet.columns().iter();
860    let column_factory =
861        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
862    for field in &arrow.fields {
863        column_factory.get_arrow_column_writer(
864            field.data_type(),
865            props,
866            &mut leaves,
867            &mut writers,
868        )?;
869    }
870    Ok(writers)
871}
872
873/// Gets [`ArrowColumnWriter`] instances for different data types
874struct ArrowColumnWriterFactory {
875    #[cfg(feature = "encryption")]
876    row_group_index: usize,
877    #[cfg(feature = "encryption")]
878    file_encryptor: Option<Arc<FileEncryptor>>,
879}
880
881impl ArrowColumnWriterFactory {
882    pub fn new() -> Self {
883        Self {
884            #[cfg(feature = "encryption")]
885            row_group_index: 0,
886            #[cfg(feature = "encryption")]
887            file_encryptor: None,
888        }
889    }
890
891    #[cfg(feature = "encryption")]
892    pub fn with_file_encryptor(
893        mut self,
894        row_group_index: usize,
895        file_encryptor: Option<Arc<FileEncryptor>>,
896    ) -> Self {
897        self.row_group_index = row_group_index;
898        self.file_encryptor = file_encryptor;
899        self
900    }
901
902    #[cfg(feature = "encryption")]
903    fn create_page_writer(
904        &self,
905        column_descriptor: &ColumnDescPtr,
906        column_index: usize,
907    ) -> Result<Box<ArrowPageWriter>> {
908        let column_path = column_descriptor.path().string();
909        let page_encryptor = PageEncryptor::create_if_column_encrypted(
910            &self.file_encryptor,
911            self.row_group_index,
912            column_index,
913            &column_path,
914        )?;
915        Ok(Box::new(
916            ArrowPageWriter::default().with_encryptor(page_encryptor),
917        ))
918    }
919
920    #[cfg(not(feature = "encryption"))]
921    fn create_page_writer(
922        &self,
923        _column_descriptor: &ColumnDescPtr,
924        _column_index: usize,
925    ) -> Result<Box<ArrowPageWriter>> {
926        Ok(Box::<ArrowPageWriter>::default())
927    }
928
929    /// Gets the [`ArrowColumnWriter`] for the given `data_type`
930    fn get_arrow_column_writer(
931        &self,
932        data_type: &ArrowDataType,
933        props: &WriterPropertiesPtr,
934        leaves: &mut Iter<'_, ColumnDescPtr>,
935        out: &mut Vec<ArrowColumnWriter>,
936    ) -> Result<()> {
937        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
938            let page_writer = self.create_page_writer(desc, out.len())?;
939            let chunk = page_writer.buffer.clone();
940            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
941            Ok(ArrowColumnWriter {
942                chunk,
943                writer: ArrowColumnWriterImpl::Column(writer),
944            })
945        };
946
947        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
948            let page_writer = self.create_page_writer(desc, out.len())?;
949            let chunk = page_writer.buffer.clone();
950            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
951            Ok(ArrowColumnWriter {
952                chunk,
953                writer: ArrowColumnWriterImpl::ByteArray(writer),
954            })
955        };
956
957        match data_type {
958            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
959            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
960            ArrowDataType::LargeBinary
961            | ArrowDataType::Binary
962            | ArrowDataType::Utf8
963            | ArrowDataType::LargeUtf8
964            | ArrowDataType::BinaryView
965            | ArrowDataType::Utf8View => {
966                out.push(bytes(leaves.next().unwrap())?)
967            }
968            ArrowDataType::List(f)
969            | ArrowDataType::LargeList(f)
970            | ArrowDataType::FixedSizeList(f, _) => {
971                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
972            }
973            ArrowDataType::Struct(fields) => {
974                for field in fields {
975                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
976                }
977            }
978            ArrowDataType::Map(f, _) => match f.data_type() {
979                ArrowDataType::Struct(f) => {
980                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
981                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
982                }
983                _ => unreachable!("invalid map type"),
984            }
985            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
986                ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
987                    out.push(bytes(leaves.next().unwrap())?)
988                }
989                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
990                    out.push(bytes(leaves.next().unwrap())?)
991                }
992                _ => {
993                    out.push(col(leaves.next().unwrap())?)
994                }
995            }
996            _ => return Err(ParquetError::NYI(
997                format!(
998                    "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
999                )
1000            ))
1001        }
1002        Ok(())
1003    }
1004}
1005
1006fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1007    let column = levels.array().as_ref();
1008    let indices = levels.non_null_indices();
1009    match writer {
1010        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1011            match column.data_type() {
1012                ArrowDataType::Date64 => {
1013                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1014                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1015                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1016
1017                    let array = array.as_primitive::<Int32Type>();
1018                    write_primitive(typed, array.values(), levels)
1019                }
1020                ArrowDataType::UInt32 => {
1021                    let values = column.as_primitive::<UInt32Type>().values();
1022                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1023                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1024                    let array = values.inner().typed_data::<i32>();
1025                    write_primitive(typed, array, levels)
1026                }
1027                ArrowDataType::Decimal128(_, _) => {
1028                    // use the int32 to represent the decimal with low precision
1029                    let array = column
1030                        .as_primitive::<Decimal128Type>()
1031                        .unary::<_, Int32Type>(|v| v as i32);
1032                    write_primitive(typed, array.values(), levels)
1033                }
1034                ArrowDataType::Decimal256(_, _) => {
1035                    // use the int32 to represent the decimal with low precision
1036                    let array = column
1037                        .as_primitive::<Decimal256Type>()
1038                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1039                    write_primitive(typed, array.values(), levels)
1040                }
1041                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1042                    ArrowDataType::Decimal128(_, _) => {
1043                        let array = arrow_cast::cast(column, value_type)?;
1044                        let array = array
1045                            .as_primitive::<Decimal128Type>()
1046                            .unary::<_, Int32Type>(|v| v as i32);
1047                        write_primitive(typed, array.values(), levels)
1048                    }
1049                    ArrowDataType::Decimal256(_, _) => {
1050                        let array = arrow_cast::cast(column, value_type)?;
1051                        let array = array
1052                            .as_primitive::<Decimal256Type>()
1053                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1054                        write_primitive(typed, array.values(), levels)
1055                    }
1056                    _ => {
1057                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1058                        let array = array.as_primitive::<Int32Type>();
1059                        write_primitive(typed, array.values(), levels)
1060                    }
1061                },
1062                _ => {
1063                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1064                    let array = array.as_primitive::<Int32Type>();
1065                    write_primitive(typed, array.values(), levels)
1066                }
1067            }
1068        }
1069        ColumnWriter::BoolColumnWriter(ref mut typed) => {
1070            let array = column.as_boolean();
1071            typed.write_batch(
1072                get_bool_array_slice(array, indices).as_slice(),
1073                levels.def_levels(),
1074                levels.rep_levels(),
1075            )
1076        }
1077        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1078            match column.data_type() {
1079                ArrowDataType::Date64 => {
1080                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1081
1082                    let array = array.as_primitive::<Int64Type>();
1083                    write_primitive(typed, array.values(), levels)
1084                }
1085                ArrowDataType::Int64 => {
1086                    let array = column.as_primitive::<Int64Type>();
1087                    write_primitive(typed, array.values(), levels)
1088                }
1089                ArrowDataType::UInt64 => {
1090                    let values = column.as_primitive::<UInt64Type>().values();
1091                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1092                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1093                    let array = values.inner().typed_data::<i64>();
1094                    write_primitive(typed, array, levels)
1095                }
1096                ArrowDataType::Decimal128(_, _) => {
1097                    // use the int64 to represent the decimal with low precision
1098                    let array = column
1099                        .as_primitive::<Decimal128Type>()
1100                        .unary::<_, Int64Type>(|v| v as i64);
1101                    write_primitive(typed, array.values(), levels)
1102                }
1103                ArrowDataType::Decimal256(_, _) => {
1104                    // use the int64 to represent the decimal with low precision
1105                    let array = column
1106                        .as_primitive::<Decimal256Type>()
1107                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1108                    write_primitive(typed, array.values(), levels)
1109                }
1110                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1111                    ArrowDataType::Decimal128(_, _) => {
1112                        let array = arrow_cast::cast(column, value_type)?;
1113                        let array = array
1114                            .as_primitive::<Decimal128Type>()
1115                            .unary::<_, Int64Type>(|v| v as i64);
1116                        write_primitive(typed, array.values(), levels)
1117                    }
1118                    ArrowDataType::Decimal256(_, _) => {
1119                        let array = arrow_cast::cast(column, value_type)?;
1120                        let array = array
1121                            .as_primitive::<Decimal256Type>()
1122                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1123                        write_primitive(typed, array.values(), levels)
1124                    }
1125                    _ => {
1126                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1127                        let array = array.as_primitive::<Int64Type>();
1128                        write_primitive(typed, array.values(), levels)
1129                    }
1130                },
1131                _ => {
1132                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1133                    let array = array.as_primitive::<Int64Type>();
1134                    write_primitive(typed, array.values(), levels)
1135                }
1136            }
1137        }
1138        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1139            unreachable!("Currently unreachable because data type not supported")
1140        }
1141        ColumnWriter::FloatColumnWriter(ref mut typed) => {
1142            let array = column.as_primitive::<Float32Type>();
1143            write_primitive(typed, array.values(), levels)
1144        }
1145        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1146            let array = column.as_primitive::<Float64Type>();
1147            write_primitive(typed, array.values(), levels)
1148        }
1149        ColumnWriter::ByteArrayColumnWriter(_) => {
1150            unreachable!("should use ByteArrayWriter")
1151        }
1152        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1153            let bytes = match column.data_type() {
1154                ArrowDataType::Interval(interval_unit) => match interval_unit {
1155                    IntervalUnit::YearMonth => {
1156                        let array = column
1157                            .as_any()
1158                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1159                            .unwrap();
1160                        get_interval_ym_array_slice(array, indices)
1161                    }
1162                    IntervalUnit::DayTime => {
1163                        let array = column
1164                            .as_any()
1165                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1166                            .unwrap();
1167                        get_interval_dt_array_slice(array, indices)
1168                    }
1169                    _ => {
1170                        return Err(ParquetError::NYI(
1171                            format!(
1172                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1173                            )
1174                        ));
1175                    }
1176                },
1177                ArrowDataType::FixedSizeBinary(_) => {
1178                    let array = column
1179                        .as_any()
1180                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1181                        .unwrap();
1182                    get_fsb_array_slice(array, indices)
1183                }
1184                ArrowDataType::Decimal128(_, _) => {
1185                    let array = column.as_primitive::<Decimal128Type>();
1186                    get_decimal_128_array_slice(array, indices)
1187                }
1188                ArrowDataType::Decimal256(_, _) => {
1189                    let array = column
1190                        .as_any()
1191                        .downcast_ref::<arrow_array::Decimal256Array>()
1192                        .unwrap();
1193                    get_decimal_256_array_slice(array, indices)
1194                }
1195                ArrowDataType::Float16 => {
1196                    let array = column.as_primitive::<Float16Type>();
1197                    get_float_16_array_slice(array, indices)
1198                }
1199                _ => {
1200                    return Err(ParquetError::NYI(
1201                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1202                    ));
1203                }
1204            };
1205            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1206        }
1207    }
1208}
1209
1210fn write_primitive<E: ColumnValueEncoder>(
1211    writer: &mut GenericColumnWriter<E>,
1212    values: &E::Values,
1213    levels: &ArrayLevels,
1214) -> Result<usize> {
1215    writer.write_batch_internal(
1216        values,
1217        Some(levels.non_null_indices()),
1218        levels.def_levels(),
1219        levels.rep_levels(),
1220        None,
1221        None,
1222        None,
1223    )
1224}
1225
1226fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1227    let mut values = Vec::with_capacity(indices.len());
1228    for i in indices {
1229        values.push(array.value(*i))
1230    }
1231    values
1232}
1233
1234/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1235/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1236fn get_interval_ym_array_slice(
1237    array: &arrow_array::IntervalYearMonthArray,
1238    indices: &[usize],
1239) -> Vec<FixedLenByteArray> {
1240    let mut values = Vec::with_capacity(indices.len());
1241    for i in indices {
1242        let mut value = array.value(*i).to_le_bytes().to_vec();
1243        let mut suffix = vec![0; 8];
1244        value.append(&mut suffix);
1245        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1246    }
1247    values
1248}
1249
1250/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1251/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1252fn get_interval_dt_array_slice(
1253    array: &arrow_array::IntervalDayTimeArray,
1254    indices: &[usize],
1255) -> Vec<FixedLenByteArray> {
1256    let mut values = Vec::with_capacity(indices.len());
1257    for i in indices {
1258        let mut out = [0; 12];
1259        let value = array.value(*i);
1260        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1261        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1262        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1263    }
1264    values
1265}
1266
1267fn get_decimal_128_array_slice(
1268    array: &arrow_array::Decimal128Array,
1269    indices: &[usize],
1270) -> Vec<FixedLenByteArray> {
1271    let mut values = Vec::with_capacity(indices.len());
1272    let size = decimal_length_from_precision(array.precision());
1273    for i in indices {
1274        let as_be_bytes = array.value(*i).to_be_bytes();
1275        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1276        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1277    }
1278    values
1279}
1280
1281fn get_decimal_256_array_slice(
1282    array: &arrow_array::Decimal256Array,
1283    indices: &[usize],
1284) -> Vec<FixedLenByteArray> {
1285    let mut values = Vec::with_capacity(indices.len());
1286    let size = decimal_length_from_precision(array.precision());
1287    for i in indices {
1288        let as_be_bytes = array.value(*i).to_be_bytes();
1289        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1290        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1291    }
1292    values
1293}
1294
1295fn get_float_16_array_slice(
1296    array: &arrow_array::Float16Array,
1297    indices: &[usize],
1298) -> Vec<FixedLenByteArray> {
1299    let mut values = Vec::with_capacity(indices.len());
1300    for i in indices {
1301        let value = array.value(*i).to_le_bytes().to_vec();
1302        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1303    }
1304    values
1305}
1306
1307fn get_fsb_array_slice(
1308    array: &arrow_array::FixedSizeBinaryArray,
1309    indices: &[usize],
1310) -> Vec<FixedLenByteArray> {
1311    let mut values = Vec::with_capacity(indices.len());
1312    for i in indices {
1313        let value = array.value(*i).to_vec();
1314        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1315    }
1316    values
1317}
1318
1319#[cfg(test)]
1320mod tests {
1321    use super::*;
1322
1323    use std::fs::File;
1324
1325    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1326    use crate::arrow::ARROW_SCHEMA_META_KEY;
1327    use arrow::datatypes::ToByteSlice;
1328    use arrow::datatypes::{DataType, Schema};
1329    use arrow::error::Result as ArrowResult;
1330    use arrow::util::data_gen::create_random_array;
1331    use arrow::util::pretty::pretty_format_batches;
1332    use arrow::{array::*, buffer::Buffer};
1333    use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1334    use arrow_schema::Fields;
1335    use half::f16;
1336
1337    use crate::basic::Encoding;
1338    use crate::data_type::AsBytes;
1339    use crate::file::metadata::ParquetMetaData;
1340    use crate::file::page_index::index::Index;
1341    use crate::file::page_index::index_reader::read_offset_indexes;
1342    use crate::file::properties::{
1343        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1344    };
1345    use crate::file::serialized_reader::ReadOptionsBuilder;
1346    use crate::file::{
1347        reader::{FileReader, SerializedFileReader},
1348        statistics::Statistics,
1349    };
1350
1351    #[test]
1352    fn arrow_writer() {
1353        // define schema
1354        let schema = Schema::new(vec![
1355            Field::new("a", DataType::Int32, false),
1356            Field::new("b", DataType::Int32, true),
1357        ]);
1358
1359        // create some data
1360        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1361        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1362
1363        // build a record batch
1364        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1365
1366        roundtrip(batch, Some(SMALL_SIZE / 2));
1367    }
1368
1369    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1370        let mut buffer = vec![];
1371
1372        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1373        writer.write(expected_batch).unwrap();
1374        writer.close().unwrap();
1375
1376        buffer
1377    }
1378
1379    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1380        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1381        writer.write(expected_batch).unwrap();
1382        writer.into_inner().unwrap()
1383    }
1384
1385    #[test]
1386    fn roundtrip_bytes() {
1387        // define schema
1388        let schema = Arc::new(Schema::new(vec![
1389            Field::new("a", DataType::Int32, false),
1390            Field::new("b", DataType::Int32, true),
1391        ]));
1392
1393        // create some data
1394        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1395        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1396
1397        // build a record batch
1398        let expected_batch =
1399            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1400
1401        for buffer in [
1402            get_bytes_after_close(schema.clone(), &expected_batch),
1403            get_bytes_by_into_inner(schema, &expected_batch),
1404        ] {
1405            let cursor = Bytes::from(buffer);
1406            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1407
1408            let actual_batch = record_batch_reader
1409                .next()
1410                .expect("No batch found")
1411                .expect("Unable to get batch");
1412
1413            assert_eq!(expected_batch.schema(), actual_batch.schema());
1414            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1415            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1416            for i in 0..expected_batch.num_columns() {
1417                let expected_data = expected_batch.column(i).to_data();
1418                let actual_data = actual_batch.column(i).to_data();
1419
1420                assert_eq!(expected_data, actual_data);
1421            }
1422        }
1423    }
1424
1425    #[test]
1426    fn arrow_writer_non_null() {
1427        // define schema
1428        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1429
1430        // create some data
1431        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1432
1433        // build a record batch
1434        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1435
1436        roundtrip(batch, Some(SMALL_SIZE / 2));
1437    }
1438
1439    #[test]
1440    fn arrow_writer_list() {
1441        // define schema
1442        let schema = Schema::new(vec![Field::new(
1443            "a",
1444            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1445            true,
1446        )]);
1447
1448        // create some data
1449        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1450
1451        // Construct a buffer for value offsets, for the nested array:
1452        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1453        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1454
1455        // Construct a list array from the above two
1456        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1457            DataType::Int32,
1458            false,
1459        ))))
1460        .len(5)
1461        .add_buffer(a_value_offsets)
1462        .add_child_data(a_values.into_data())
1463        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1464        .build()
1465        .unwrap();
1466        let a = ListArray::from(a_list_data);
1467
1468        // build a record batch
1469        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1470
1471        assert_eq!(batch.column(0).null_count(), 1);
1472
1473        // This test fails if the max row group size is less than the batch's length
1474        // see https://github.com/apache/arrow-rs/issues/518
1475        roundtrip(batch, None);
1476    }
1477
1478    #[test]
1479    fn arrow_writer_list_non_null() {
1480        // define schema
1481        let schema = Schema::new(vec![Field::new(
1482            "a",
1483            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1484            false,
1485        )]);
1486
1487        // create some data
1488        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1489
1490        // Construct a buffer for value offsets, for the nested array:
1491        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1492        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1493
1494        // Construct a list array from the above two
1495        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1496            DataType::Int32,
1497            false,
1498        ))))
1499        .len(5)
1500        .add_buffer(a_value_offsets)
1501        .add_child_data(a_values.into_data())
1502        .build()
1503        .unwrap();
1504        let a = ListArray::from(a_list_data);
1505
1506        // build a record batch
1507        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1508
1509        // This test fails if the max row group size is less than the batch's length
1510        // see https://github.com/apache/arrow-rs/issues/518
1511        assert_eq!(batch.column(0).null_count(), 0);
1512
1513        roundtrip(batch, None);
1514    }
1515
1516    #[test]
1517    fn arrow_writer_binary() {
1518        let string_field = Field::new("a", DataType::Utf8, false);
1519        let binary_field = Field::new("b", DataType::Binary, false);
1520        let schema = Schema::new(vec![string_field, binary_field]);
1521
1522        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1523        let raw_binary_values = [
1524            b"foo".to_vec(),
1525            b"bar".to_vec(),
1526            b"baz".to_vec(),
1527            b"quux".to_vec(),
1528        ];
1529        let raw_binary_value_refs = raw_binary_values
1530            .iter()
1531            .map(|x| x.as_slice())
1532            .collect::<Vec<_>>();
1533
1534        let string_values = StringArray::from(raw_string_values.clone());
1535        let binary_values = BinaryArray::from(raw_binary_value_refs);
1536        let batch = RecordBatch::try_new(
1537            Arc::new(schema),
1538            vec![Arc::new(string_values), Arc::new(binary_values)],
1539        )
1540        .unwrap();
1541
1542        roundtrip(batch, Some(SMALL_SIZE / 2));
1543    }
1544
1545    #[test]
1546    fn arrow_writer_binary_view() {
1547        let string_field = Field::new("a", DataType::Utf8View, false);
1548        let binary_field = Field::new("b", DataType::BinaryView, false);
1549        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1550        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1551
1552        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1553        let raw_binary_values = vec![
1554            b"foo".to_vec(),
1555            b"bar".to_vec(),
1556            b"large payload over 12 bytes".to_vec(),
1557            b"lulu".to_vec(),
1558        ];
1559        let nullable_string_values =
1560            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1561
1562        let string_view_values = StringViewArray::from(raw_string_values);
1563        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1564        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1565        let batch = RecordBatch::try_new(
1566            Arc::new(schema),
1567            vec![
1568                Arc::new(string_view_values),
1569                Arc::new(binary_view_values),
1570                Arc::new(nullable_string_view_values),
1571            ],
1572        )
1573        .unwrap();
1574
1575        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1576        roundtrip(batch, None);
1577    }
1578
1579    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1580        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1581        let schema = Schema::new(vec![decimal_field]);
1582
1583        let decimal_values = vec![10_000, 50_000, 0, -100]
1584            .into_iter()
1585            .map(Some)
1586            .collect::<Decimal128Array>()
1587            .with_precision_and_scale(precision, scale)
1588            .unwrap();
1589
1590        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1591    }
1592
1593    #[test]
1594    fn arrow_writer_decimal() {
1595        // int32 to store the decimal value
1596        let batch_int32_decimal = get_decimal_batch(5, 2);
1597        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1598        // int64 to store the decimal value
1599        let batch_int64_decimal = get_decimal_batch(12, 2);
1600        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1601        // fixed_length_byte_array to store the decimal value
1602        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1603        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1604    }
1605
1606    #[test]
1607    fn arrow_writer_complex() {
1608        // define schema
1609        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1610        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1611        let struct_field_g = Arc::new(Field::new_list(
1612            "g",
1613            Field::new_list_field(DataType::Int16, true),
1614            false,
1615        ));
1616        let struct_field_h = Arc::new(Field::new_list(
1617            "h",
1618            Field::new_list_field(DataType::Int16, false),
1619            true,
1620        ));
1621        let struct_field_e = Arc::new(Field::new_struct(
1622            "e",
1623            vec![
1624                struct_field_f.clone(),
1625                struct_field_g.clone(),
1626                struct_field_h.clone(),
1627            ],
1628            false,
1629        ));
1630        let schema = Schema::new(vec![
1631            Field::new("a", DataType::Int32, false),
1632            Field::new("b", DataType::Int32, true),
1633            Field::new_struct(
1634                "c",
1635                vec![struct_field_d.clone(), struct_field_e.clone()],
1636                false,
1637            ),
1638        ]);
1639
1640        // create some data
1641        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1642        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1643        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1644        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1645
1646        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1647
1648        // Construct a buffer for value offsets, for the nested array:
1649        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1650        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1651
1652        // Construct a list array from the above two
1653        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1654            .len(5)
1655            .add_buffer(g_value_offsets.clone())
1656            .add_child_data(g_value.to_data())
1657            .build()
1658            .unwrap();
1659        let g = ListArray::from(g_list_data);
1660        // The difference between g and h is that h has a null bitmap
1661        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1662            .len(5)
1663            .add_buffer(g_value_offsets)
1664            .add_child_data(g_value.to_data())
1665            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1666            .build()
1667            .unwrap();
1668        let h = ListArray::from(h_list_data);
1669
1670        let e = StructArray::from(vec![
1671            (struct_field_f, Arc::new(f) as ArrayRef),
1672            (struct_field_g, Arc::new(g) as ArrayRef),
1673            (struct_field_h, Arc::new(h) as ArrayRef),
1674        ]);
1675
1676        let c = StructArray::from(vec![
1677            (struct_field_d, Arc::new(d) as ArrayRef),
1678            (struct_field_e, Arc::new(e) as ArrayRef),
1679        ]);
1680
1681        // build a record batch
1682        let batch = RecordBatch::try_new(
1683            Arc::new(schema),
1684            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1685        )
1686        .unwrap();
1687
1688        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1689        roundtrip(batch, Some(SMALL_SIZE / 3));
1690    }
1691
1692    #[test]
1693    fn arrow_writer_complex_mixed() {
1694        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1695        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1696
1697        // define schema
1698        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1699        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1700        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1701        let schema = Schema::new(vec![Field::new(
1702            "some_nested_object",
1703            DataType::Struct(Fields::from(vec![
1704                offset_field.clone(),
1705                partition_field.clone(),
1706                topic_field.clone(),
1707            ])),
1708            false,
1709        )]);
1710
1711        // create some data
1712        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1713        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1714        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1715
1716        let some_nested_object = StructArray::from(vec![
1717            (offset_field, Arc::new(offset) as ArrayRef),
1718            (partition_field, Arc::new(partition) as ArrayRef),
1719            (topic_field, Arc::new(topic) as ArrayRef),
1720        ]);
1721
1722        // build a record batch
1723        let batch =
1724            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1725
1726        roundtrip(batch, Some(SMALL_SIZE / 2));
1727    }
1728
1729    #[test]
1730    fn arrow_writer_map() {
1731        // Note: we are using the JSON Arrow reader for brevity
1732        let json_content = r#"
1733        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1734        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1735        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1736        "#;
1737        let entries_struct_type = DataType::Struct(Fields::from(vec![
1738            Field::new("key", DataType::Utf8, false),
1739            Field::new("value", DataType::Utf8, true),
1740        ]));
1741        let stocks_field = Field::new(
1742            "stocks",
1743            DataType::Map(
1744                Arc::new(Field::new("entries", entries_struct_type, false)),
1745                false,
1746            ),
1747            true,
1748        );
1749        let schema = Arc::new(Schema::new(vec![stocks_field]));
1750        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1751        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1752
1753        let batch = reader.next().unwrap().unwrap();
1754        roundtrip(batch, None);
1755    }
1756
1757    #[test]
1758    fn arrow_writer_2_level_struct() {
1759        // tests writing <struct<struct<primitive>>
1760        let field_c = Field::new("c", DataType::Int32, true);
1761        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1762        let type_a = DataType::Struct(vec![field_b.clone()].into());
1763        let field_a = Field::new("a", type_a, true);
1764        let schema = Schema::new(vec![field_a.clone()]);
1765
1766        // create data
1767        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1768        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1769            .len(6)
1770            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1771            .add_child_data(c.into_data())
1772            .build()
1773            .unwrap();
1774        let b = StructArray::from(b_data);
1775        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1776            .len(6)
1777            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1778            .add_child_data(b.into_data())
1779            .build()
1780            .unwrap();
1781        let a = StructArray::from(a_data);
1782
1783        assert_eq!(a.null_count(), 1);
1784        assert_eq!(a.column(0).null_count(), 2);
1785
1786        // build a racord batch
1787        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1788
1789        roundtrip(batch, Some(SMALL_SIZE / 2));
1790    }
1791
1792    #[test]
1793    fn arrow_writer_2_level_struct_non_null() {
1794        // tests writing <struct<struct<primitive>>
1795        let field_c = Field::new("c", DataType::Int32, false);
1796        let type_b = DataType::Struct(vec![field_c].into());
1797        let field_b = Field::new("b", type_b.clone(), false);
1798        let type_a = DataType::Struct(vec![field_b].into());
1799        let field_a = Field::new("a", type_a.clone(), false);
1800        let schema = Schema::new(vec![field_a]);
1801
1802        // create data
1803        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1804        let b_data = ArrayDataBuilder::new(type_b)
1805            .len(6)
1806            .add_child_data(c.into_data())
1807            .build()
1808            .unwrap();
1809        let b = StructArray::from(b_data);
1810        let a_data = ArrayDataBuilder::new(type_a)
1811            .len(6)
1812            .add_child_data(b.into_data())
1813            .build()
1814            .unwrap();
1815        let a = StructArray::from(a_data);
1816
1817        assert_eq!(a.null_count(), 0);
1818        assert_eq!(a.column(0).null_count(), 0);
1819
1820        // build a racord batch
1821        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1822
1823        roundtrip(batch, Some(SMALL_SIZE / 2));
1824    }
1825
1826    #[test]
1827    fn arrow_writer_2_level_struct_mixed_null() {
1828        // tests writing <struct<struct<primitive>>
1829        let field_c = Field::new("c", DataType::Int32, false);
1830        let type_b = DataType::Struct(vec![field_c].into());
1831        let field_b = Field::new("b", type_b.clone(), true);
1832        let type_a = DataType::Struct(vec![field_b].into());
1833        let field_a = Field::new("a", type_a.clone(), false);
1834        let schema = Schema::new(vec![field_a]);
1835
1836        // create data
1837        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1838        let b_data = ArrayDataBuilder::new(type_b)
1839            .len(6)
1840            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1841            .add_child_data(c.into_data())
1842            .build()
1843            .unwrap();
1844        let b = StructArray::from(b_data);
1845        // a intentionally has no null buffer, to test that this is handled correctly
1846        let a_data = ArrayDataBuilder::new(type_a)
1847            .len(6)
1848            .add_child_data(b.into_data())
1849            .build()
1850            .unwrap();
1851        let a = StructArray::from(a_data);
1852
1853        assert_eq!(a.null_count(), 0);
1854        assert_eq!(a.column(0).null_count(), 2);
1855
1856        // build a racord batch
1857        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1858
1859        roundtrip(batch, Some(SMALL_SIZE / 2));
1860    }
1861
1862    #[test]
1863    fn arrow_writer_2_level_struct_mixed_null_2() {
1864        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
1865        let field_c = Field::new("c", DataType::Int32, false);
1866        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1867        let field_e = Field::new(
1868            "e",
1869            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1870            false,
1871        );
1872
1873        let field_b = Field::new(
1874            "b",
1875            DataType::Struct(vec![field_c, field_d, field_e].into()),
1876            false,
1877        );
1878        let type_a = DataType::Struct(vec![field_b.clone()].into());
1879        let field_a = Field::new("a", type_a, true);
1880        let schema = Schema::new(vec![field_a.clone()]);
1881
1882        // create data
1883        let c = Int32Array::from_iter_values(0..6);
1884        let d = FixedSizeBinaryArray::try_from_iter(
1885            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1886        )
1887        .expect("four byte values");
1888        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1889        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1890            .len(6)
1891            .add_child_data(c.into_data())
1892            .add_child_data(d.into_data())
1893            .add_child_data(e.into_data())
1894            .build()
1895            .unwrap();
1896        let b = StructArray::from(b_data);
1897        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1898            .len(6)
1899            .null_bit_buffer(Some(Buffer::from([0b00100101])))
1900            .add_child_data(b.into_data())
1901            .build()
1902            .unwrap();
1903        let a = StructArray::from(a_data);
1904
1905        assert_eq!(a.null_count(), 3);
1906        assert_eq!(a.column(0).null_count(), 0);
1907
1908        // build a record batch
1909        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1910
1911        roundtrip(batch, Some(SMALL_SIZE / 2));
1912    }
1913
1914    #[test]
1915    fn test_empty_dict() {
1916        let struct_fields = Fields::from(vec![Field::new(
1917            "dict",
1918            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1919            false,
1920        )]);
1921
1922        let schema = Schema::new(vec![Field::new_struct(
1923            "struct",
1924            struct_fields.clone(),
1925            true,
1926        )]);
1927        let dictionary = Arc::new(DictionaryArray::new(
1928            Int32Array::new_null(5),
1929            Arc::new(StringArray::new_null(0)),
1930        ));
1931
1932        let s = StructArray::new(
1933            struct_fields,
1934            vec![dictionary],
1935            Some(NullBuffer::new_null(5)),
1936        );
1937
1938        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1939        roundtrip(batch, None);
1940    }
1941    #[test]
1942    fn arrow_writer_page_size() {
1943        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1944
1945        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1946
1947        // Generate an array of 10 unique 10 character string
1948        for i in 0..10 {
1949            let value = i
1950                .to_string()
1951                .repeat(10)
1952                .chars()
1953                .take(10)
1954                .collect::<String>();
1955
1956            builder.append_value(value);
1957        }
1958
1959        let array = Arc::new(builder.finish());
1960
1961        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1962
1963        let file = tempfile::tempfile().unwrap();
1964
1965        // Set everything very low so we fallback to PLAIN encoding after the first row
1966        let props = WriterProperties::builder()
1967            .set_data_page_size_limit(1)
1968            .set_dictionary_page_size_limit(1)
1969            .set_write_batch_size(1)
1970            .build();
1971
1972        let mut writer =
1973            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
1974                .expect("Unable to write file");
1975        writer.write(&batch).unwrap();
1976        writer.close().unwrap();
1977
1978        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
1979
1980        let column = reader.metadata().row_group(0).columns();
1981
1982        assert_eq!(column.len(), 1);
1983
1984        // We should write one row before falling back to PLAIN encoding so there should still be a
1985        // dictionary page.
1986        assert!(
1987            column[0].dictionary_page_offset().is_some(),
1988            "Expected a dictionary page"
1989        );
1990
1991        let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
1992
1993        let page_locations = offset_indexes[0].page_locations.clone();
1994
1995        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
1996        // so we expect one dictionary encoded page and then a page per row thereafter.
1997        assert_eq!(
1998            page_locations.len(),
1999            10,
2000            "Expected 9 pages but got {page_locations:#?}"
2001        );
2002    }
2003
2004    #[test]
2005    fn arrow_writer_float_nans() {
2006        let f16_field = Field::new("a", DataType::Float16, false);
2007        let f32_field = Field::new("b", DataType::Float32, false);
2008        let f64_field = Field::new("c", DataType::Float64, false);
2009        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2010
2011        let f16_values = (0..MEDIUM_SIZE)
2012            .map(|i| {
2013                Some(if i % 2 == 0 {
2014                    f16::NAN
2015                } else {
2016                    f16::from_f32(i as f32)
2017                })
2018            })
2019            .collect::<Float16Array>();
2020
2021        let f32_values = (0..MEDIUM_SIZE)
2022            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2023            .collect::<Float32Array>();
2024
2025        let f64_values = (0..MEDIUM_SIZE)
2026            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2027            .collect::<Float64Array>();
2028
2029        let batch = RecordBatch::try_new(
2030            Arc::new(schema),
2031            vec![
2032                Arc::new(f16_values),
2033                Arc::new(f32_values),
2034                Arc::new(f64_values),
2035            ],
2036        )
2037        .unwrap();
2038
2039        roundtrip(batch, None);
2040    }
2041
2042    const SMALL_SIZE: usize = 7;
2043    const MEDIUM_SIZE: usize = 63;
2044
2045    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2046        let mut files = vec![];
2047        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2048            let mut props = WriterProperties::builder().set_writer_version(version);
2049
2050            if let Some(size) = max_row_group_size {
2051                props = props.set_max_row_group_size(size)
2052            }
2053
2054            let props = props.build();
2055            files.push(roundtrip_opts(&expected_batch, props))
2056        }
2057        files
2058    }
2059
2060    fn roundtrip_opts_with_array_validation<F>(
2061        expected_batch: &RecordBatch,
2062        props: WriterProperties,
2063        validate: F,
2064    ) -> File
2065    where
2066        F: Fn(&ArrayData, &ArrayData),
2067    {
2068        let file = tempfile::tempfile().unwrap();
2069
2070        let mut writer = ArrowWriter::try_new(
2071            file.try_clone().unwrap(),
2072            expected_batch.schema(),
2073            Some(props),
2074        )
2075        .expect("Unable to write file");
2076        writer.write(expected_batch).unwrap();
2077        writer.close().unwrap();
2078
2079        let mut record_batch_reader =
2080            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2081
2082        let actual_batch = record_batch_reader
2083            .next()
2084            .expect("No batch found")
2085            .expect("Unable to get batch");
2086
2087        assert_eq!(expected_batch.schema(), actual_batch.schema());
2088        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2089        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2090        for i in 0..expected_batch.num_columns() {
2091            let expected_data = expected_batch.column(i).to_data();
2092            let actual_data = actual_batch.column(i).to_data();
2093            validate(&expected_data, &actual_data);
2094        }
2095
2096        file
2097    }
2098
2099    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2100        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2101            a.validate_full().expect("valid expected data");
2102            b.validate_full().expect("valid actual data");
2103            assert_eq!(a, b)
2104        })
2105    }
2106
2107    struct RoundTripOptions {
2108        values: ArrayRef,
2109        schema: SchemaRef,
2110        bloom_filter: bool,
2111        bloom_filter_position: BloomFilterPosition,
2112    }
2113
2114    impl RoundTripOptions {
2115        fn new(values: ArrayRef, nullable: bool) -> Self {
2116            let data_type = values.data_type().clone();
2117            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2118            Self {
2119                values,
2120                schema: Arc::new(schema),
2121                bloom_filter: false,
2122                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2123            }
2124        }
2125    }
2126
2127    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2128        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2129    }
2130
2131    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2132        let mut options = RoundTripOptions::new(values, false);
2133        options.schema = schema;
2134        one_column_roundtrip_with_options(options)
2135    }
2136
2137    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2138        let RoundTripOptions {
2139            values,
2140            schema,
2141            bloom_filter,
2142            bloom_filter_position,
2143        } = options;
2144
2145        let encodings = match values.data_type() {
2146            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2147                vec![
2148                    Encoding::PLAIN,
2149                    Encoding::DELTA_BYTE_ARRAY,
2150                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2151                ]
2152            }
2153            DataType::Int64
2154            | DataType::Int32
2155            | DataType::Int16
2156            | DataType::Int8
2157            | DataType::UInt64
2158            | DataType::UInt32
2159            | DataType::UInt16
2160            | DataType::UInt8 => vec![
2161                Encoding::PLAIN,
2162                Encoding::DELTA_BINARY_PACKED,
2163                Encoding::BYTE_STREAM_SPLIT,
2164            ],
2165            DataType::Float32 | DataType::Float64 => {
2166                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2167            }
2168            _ => vec![Encoding::PLAIN],
2169        };
2170
2171        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2172
2173        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2174
2175        let mut files = vec![];
2176        for dictionary_size in [0, 1, 1024] {
2177            for encoding in &encodings {
2178                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2179                    for row_group_size in row_group_sizes {
2180                        let props = WriterProperties::builder()
2181                            .set_writer_version(version)
2182                            .set_max_row_group_size(row_group_size)
2183                            .set_dictionary_enabled(dictionary_size != 0)
2184                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2185                            .set_encoding(*encoding)
2186                            .set_bloom_filter_enabled(bloom_filter)
2187                            .set_bloom_filter_position(bloom_filter_position)
2188                            .build();
2189
2190                        files.push(roundtrip_opts(&expected_batch, props))
2191                    }
2192                }
2193            }
2194        }
2195        files
2196    }
2197
2198    fn values_required<A, I>(iter: I) -> Vec<File>
2199    where
2200        A: From<Vec<I::Item>> + Array + 'static,
2201        I: IntoIterator,
2202    {
2203        let raw_values: Vec<_> = iter.into_iter().collect();
2204        let values = Arc::new(A::from(raw_values));
2205        one_column_roundtrip(values, false)
2206    }
2207
2208    fn values_optional<A, I>(iter: I) -> Vec<File>
2209    where
2210        A: From<Vec<Option<I::Item>>> + Array + 'static,
2211        I: IntoIterator,
2212    {
2213        let optional_raw_values: Vec<_> = iter
2214            .into_iter()
2215            .enumerate()
2216            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2217            .collect();
2218        let optional_values = Arc::new(A::from(optional_raw_values));
2219        one_column_roundtrip(optional_values, true)
2220    }
2221
2222    fn required_and_optional<A, I>(iter: I)
2223    where
2224        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2225        I: IntoIterator + Clone,
2226    {
2227        values_required::<A, I>(iter.clone());
2228        values_optional::<A, I>(iter);
2229    }
2230
2231    fn check_bloom_filter<T: AsBytes>(
2232        files: Vec<File>,
2233        file_column: String,
2234        positive_values: Vec<T>,
2235        negative_values: Vec<T>,
2236    ) {
2237        files.into_iter().take(1).for_each(|file| {
2238            let file_reader = SerializedFileReader::new_with_options(
2239                file,
2240                ReadOptionsBuilder::new()
2241                    .with_reader_properties(
2242                        ReaderProperties::builder()
2243                            .set_read_bloom_filter(true)
2244                            .build(),
2245                    )
2246                    .build(),
2247            )
2248            .expect("Unable to open file as Parquet");
2249            let metadata = file_reader.metadata();
2250
2251            // Gets bloom filters from all row groups.
2252            let mut bloom_filters: Vec<_> = vec![];
2253            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2254                if let Some((column_index, _)) = row_group
2255                    .columns()
2256                    .iter()
2257                    .enumerate()
2258                    .find(|(_, column)| column.column_path().string() == file_column)
2259                {
2260                    let row_group_reader = file_reader
2261                        .get_row_group(ri)
2262                        .expect("Unable to read row group");
2263                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2264                        bloom_filters.push(sbbf.clone());
2265                    } else {
2266                        panic!("No bloom filter for column named {file_column} found");
2267                    }
2268                } else {
2269                    panic!("No column named {file_column} found");
2270                }
2271            }
2272
2273            positive_values.iter().for_each(|value| {
2274                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2275                assert!(
2276                    found.is_some(),
2277                    "{}",
2278                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2279                );
2280            });
2281
2282            negative_values.iter().for_each(|value| {
2283                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2284                assert!(
2285                    found.is_none(),
2286                    "{}",
2287                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2288                );
2289            });
2290        });
2291    }
2292
2293    #[test]
2294    fn all_null_primitive_single_column() {
2295        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2296        one_column_roundtrip(values, true);
2297    }
2298    #[test]
2299    fn null_single_column() {
2300        let values = Arc::new(NullArray::new(SMALL_SIZE));
2301        one_column_roundtrip(values, true);
2302        // null arrays are always nullable, a test with non-nullable nulls fails
2303    }
2304
2305    #[test]
2306    fn bool_single_column() {
2307        required_and_optional::<BooleanArray, _>(
2308            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2309        );
2310    }
2311
2312    #[test]
2313    fn bool_large_single_column() {
2314        let values = Arc::new(
2315            [None, Some(true), Some(false)]
2316                .iter()
2317                .cycle()
2318                .copied()
2319                .take(200_000)
2320                .collect::<BooleanArray>(),
2321        );
2322        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2323        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2324        let file = tempfile::tempfile().unwrap();
2325
2326        let mut writer =
2327            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2328                .expect("Unable to write file");
2329        writer.write(&expected_batch).unwrap();
2330        writer.close().unwrap();
2331    }
2332
2333    #[test]
2334    fn check_page_offset_index_with_nan() {
2335        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2336        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2337        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2338
2339        let mut out = Vec::with_capacity(1024);
2340        let mut writer =
2341            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2342        writer.write(&batch).unwrap();
2343        let file_meta_data = writer.close().unwrap();
2344        for row_group in file_meta_data.row_groups {
2345            for column in row_group.columns {
2346                assert!(column.offset_index_offset.is_some());
2347                assert!(column.offset_index_length.is_some());
2348                assert!(column.column_index_offset.is_none());
2349                assert!(column.column_index_length.is_none());
2350            }
2351        }
2352    }
2353
2354    #[test]
2355    fn i8_single_column() {
2356        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2357    }
2358
2359    #[test]
2360    fn i16_single_column() {
2361        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2362    }
2363
2364    #[test]
2365    fn i32_single_column() {
2366        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2367    }
2368
2369    #[test]
2370    fn i64_single_column() {
2371        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2372    }
2373
2374    #[test]
2375    fn u8_single_column() {
2376        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2377    }
2378
2379    #[test]
2380    fn u16_single_column() {
2381        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2382    }
2383
2384    #[test]
2385    fn u32_single_column() {
2386        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2387    }
2388
2389    #[test]
2390    fn u64_single_column() {
2391        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2392    }
2393
2394    #[test]
2395    fn f32_single_column() {
2396        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2397    }
2398
2399    #[test]
2400    fn f64_single_column() {
2401        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2402    }
2403
2404    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2405    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2406    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2407
2408    #[test]
2409    fn timestamp_second_single_column() {
2410        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2411        let values = Arc::new(TimestampSecondArray::from(raw_values));
2412
2413        one_column_roundtrip(values, false);
2414    }
2415
2416    #[test]
2417    fn timestamp_millisecond_single_column() {
2418        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2419        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2420
2421        one_column_roundtrip(values, false);
2422    }
2423
2424    #[test]
2425    fn timestamp_microsecond_single_column() {
2426        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2427        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2428
2429        one_column_roundtrip(values, false);
2430    }
2431
2432    #[test]
2433    fn timestamp_nanosecond_single_column() {
2434        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2435        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2436
2437        one_column_roundtrip(values, false);
2438    }
2439
2440    #[test]
2441    fn date32_single_column() {
2442        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2443    }
2444
2445    #[test]
2446    fn date64_single_column() {
2447        // Date64 must be a multiple of 86400000, see ARROW-10925
2448        required_and_optional::<Date64Array, _>(
2449            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2450        );
2451    }
2452
2453    #[test]
2454    fn time32_second_single_column() {
2455        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2456    }
2457
2458    #[test]
2459    fn time32_millisecond_single_column() {
2460        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2461    }
2462
2463    #[test]
2464    fn time64_microsecond_single_column() {
2465        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2466    }
2467
2468    #[test]
2469    fn time64_nanosecond_single_column() {
2470        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2471    }
2472
2473    #[test]
2474    #[should_panic(expected = "Converting Duration to parquet not supported")]
2475    fn duration_second_single_column() {
2476        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2477    }
2478
2479    #[test]
2480    #[should_panic(expected = "Converting Duration to parquet not supported")]
2481    fn duration_millisecond_single_column() {
2482        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2483    }
2484
2485    #[test]
2486    #[should_panic(expected = "Converting Duration to parquet not supported")]
2487    fn duration_microsecond_single_column() {
2488        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2489    }
2490
2491    #[test]
2492    #[should_panic(expected = "Converting Duration to parquet not supported")]
2493    fn duration_nanosecond_single_column() {
2494        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2495    }
2496
2497    #[test]
2498    fn interval_year_month_single_column() {
2499        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2500    }
2501
2502    #[test]
2503    fn interval_day_time_single_column() {
2504        required_and_optional::<IntervalDayTimeArray, _>(vec![
2505            IntervalDayTime::new(0, 1),
2506            IntervalDayTime::new(0, 3),
2507            IntervalDayTime::new(3, -2),
2508            IntervalDayTime::new(-200, 4),
2509        ]);
2510    }
2511
2512    #[test]
2513    #[should_panic(
2514        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2515    )]
2516    fn interval_month_day_nano_single_column() {
2517        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2518            IntervalMonthDayNano::new(0, 1, 5),
2519            IntervalMonthDayNano::new(0, 3, 2),
2520            IntervalMonthDayNano::new(3, -2, -5),
2521            IntervalMonthDayNano::new(-200, 4, -1),
2522        ]);
2523    }
2524
2525    #[test]
2526    fn binary_single_column() {
2527        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2528        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2529        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2530
2531        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2532        values_required::<BinaryArray, _>(many_vecs_iter);
2533    }
2534
2535    #[test]
2536    fn binary_view_single_column() {
2537        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2538        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2539        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2540
2541        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2542        values_required::<BinaryViewArray, _>(many_vecs_iter);
2543    }
2544
2545    #[test]
2546    fn i32_column_bloom_filter_at_end() {
2547        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2548        let mut options = RoundTripOptions::new(array, false);
2549        options.bloom_filter = true;
2550        options.bloom_filter_position = BloomFilterPosition::End;
2551
2552        let files = one_column_roundtrip_with_options(options);
2553        check_bloom_filter(
2554            files,
2555            "col".to_string(),
2556            (0..SMALL_SIZE as i32).collect(),
2557            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2558        );
2559    }
2560
2561    #[test]
2562    fn i32_column_bloom_filter() {
2563        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2564        let mut options = RoundTripOptions::new(array, false);
2565        options.bloom_filter = true;
2566
2567        let files = one_column_roundtrip_with_options(options);
2568        check_bloom_filter(
2569            files,
2570            "col".to_string(),
2571            (0..SMALL_SIZE as i32).collect(),
2572            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2573        );
2574    }
2575
2576    #[test]
2577    fn binary_column_bloom_filter() {
2578        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2579        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2580        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2581
2582        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2583        let mut options = RoundTripOptions::new(array, false);
2584        options.bloom_filter = true;
2585
2586        let files = one_column_roundtrip_with_options(options);
2587        check_bloom_filter(
2588            files,
2589            "col".to_string(),
2590            many_vecs,
2591            vec![vec![(SMALL_SIZE + 1) as u8]],
2592        );
2593    }
2594
2595    #[test]
2596    fn empty_string_null_column_bloom_filter() {
2597        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2598        let raw_strs = raw_values.iter().map(|s| s.as_str());
2599
2600        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2601        let mut options = RoundTripOptions::new(array, false);
2602        options.bloom_filter = true;
2603
2604        let files = one_column_roundtrip_with_options(options);
2605
2606        let optional_raw_values: Vec<_> = raw_values
2607            .iter()
2608            .enumerate()
2609            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2610            .collect();
2611        // For null slots, empty string should not be in bloom filter.
2612        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2613    }
2614
2615    #[test]
2616    fn large_binary_single_column() {
2617        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2618        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2619        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2620
2621        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2622        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2623    }
2624
2625    #[test]
2626    fn fixed_size_binary_single_column() {
2627        let mut builder = FixedSizeBinaryBuilder::new(4);
2628        builder.append_value(b"0123").unwrap();
2629        builder.append_null();
2630        builder.append_value(b"8910").unwrap();
2631        builder.append_value(b"1112").unwrap();
2632        let array = Arc::new(builder.finish());
2633
2634        one_column_roundtrip(array, true);
2635    }
2636
2637    #[test]
2638    fn string_single_column() {
2639        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2640        let raw_strs = raw_values.iter().map(|s| s.as_str());
2641
2642        required_and_optional::<StringArray, _>(raw_strs);
2643    }
2644
2645    #[test]
2646    fn large_string_single_column() {
2647        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2648        let raw_strs = raw_values.iter().map(|s| s.as_str());
2649
2650        required_and_optional::<LargeStringArray, _>(raw_strs);
2651    }
2652
2653    #[test]
2654    fn string_view_single_column() {
2655        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2656        let raw_strs = raw_values.iter().map(|s| s.as_str());
2657
2658        required_and_optional::<StringViewArray, _>(raw_strs);
2659    }
2660
2661    #[test]
2662    fn null_list_single_column() {
2663        let null_field = Field::new_list_field(DataType::Null, true);
2664        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2665
2666        let schema = Schema::new(vec![list_field]);
2667
2668        // Build [[], null, [null, null]]
2669        let a_values = NullArray::new(2);
2670        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2671        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2672            DataType::Null,
2673            true,
2674        ))))
2675        .len(3)
2676        .add_buffer(a_value_offsets)
2677        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2678        .add_child_data(a_values.into_data())
2679        .build()
2680        .unwrap();
2681
2682        let a = ListArray::from(a_list_data);
2683
2684        assert!(a.is_valid(0));
2685        assert!(!a.is_valid(1));
2686        assert!(a.is_valid(2));
2687
2688        assert_eq!(a.value(0).len(), 0);
2689        assert_eq!(a.value(2).len(), 2);
2690        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2691
2692        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2693        roundtrip(batch, None);
2694    }
2695
2696    #[test]
2697    fn list_single_column() {
2698        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2699        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2700        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2701            DataType::Int32,
2702            false,
2703        ))))
2704        .len(5)
2705        .add_buffer(a_value_offsets)
2706        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2707        .add_child_data(a_values.into_data())
2708        .build()
2709        .unwrap();
2710
2711        assert_eq!(a_list_data.null_count(), 1);
2712
2713        let a = ListArray::from(a_list_data);
2714        let values = Arc::new(a);
2715
2716        one_column_roundtrip(values, true);
2717    }
2718
2719    #[test]
2720    fn large_list_single_column() {
2721        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2722        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2723        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2724            "large_item",
2725            DataType::Int32,
2726            true,
2727        ))))
2728        .len(5)
2729        .add_buffer(a_value_offsets)
2730        .add_child_data(a_values.into_data())
2731        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2732        .build()
2733        .unwrap();
2734
2735        // I think this setup is incorrect because this should pass
2736        assert_eq!(a_list_data.null_count(), 1);
2737
2738        let a = LargeListArray::from(a_list_data);
2739        let values = Arc::new(a);
2740
2741        one_column_roundtrip(values, true);
2742    }
2743
2744    #[test]
2745    fn list_nested_nulls() {
2746        use arrow::datatypes::Int32Type;
2747        let data = vec![
2748            Some(vec![Some(1)]),
2749            Some(vec![Some(2), Some(3)]),
2750            None,
2751            Some(vec![Some(4), Some(5), None]),
2752            Some(vec![None]),
2753            Some(vec![Some(6), Some(7)]),
2754        ];
2755
2756        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2757        one_column_roundtrip(Arc::new(list), true);
2758
2759        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2760        one_column_roundtrip(Arc::new(list), true);
2761    }
2762
2763    #[test]
2764    fn struct_single_column() {
2765        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2766        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2767        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2768
2769        let values = Arc::new(s);
2770        one_column_roundtrip(values, false);
2771    }
2772
2773    #[test]
2774    fn list_and_map_coerced_names() {
2775        // Create map and list with non-Parquet naming
2776        let list_field =
2777            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2778        let map_field = Field::new_map(
2779            "my_map",
2780            "entries",
2781            Field::new("keys", DataType::Int32, false),
2782            Field::new("values", DataType::Int32, true),
2783            false,
2784            true,
2785        );
2786
2787        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2788        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2789
2790        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2791
2792        // Write data to Parquet but coerce names to match spec
2793        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2794        let file = tempfile::tempfile().unwrap();
2795        let mut writer =
2796            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2797
2798        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2799        writer.write(&batch).unwrap();
2800        let file_metadata = writer.close().unwrap();
2801
2802        // Coerced name of "item" should be "element"
2803        assert_eq!(file_metadata.schema[3].name, "element");
2804        // Coerced name of "entries" should be "key_value"
2805        assert_eq!(file_metadata.schema[5].name, "key_value");
2806        // Coerced name of "keys" should be "key"
2807        assert_eq!(file_metadata.schema[6].name, "key");
2808        // Coerced name of "values" should be "value"
2809        assert_eq!(file_metadata.schema[7].name, "value");
2810
2811        // Double check schema after reading from the file
2812        let reader = SerializedFileReader::new(file).unwrap();
2813        let file_schema = reader.metadata().file_metadata().schema();
2814        let fields = file_schema.get_fields();
2815        let list_field = &fields[0].get_fields()[0];
2816        assert_eq!(list_field.get_fields()[0].name(), "element");
2817        let map_field = &fields[1].get_fields()[0];
2818        assert_eq!(map_field.name(), "key_value");
2819        assert_eq!(map_field.get_fields()[0].name(), "key");
2820        assert_eq!(map_field.get_fields()[1].name(), "value");
2821    }
2822
2823    #[test]
2824    fn fallback_flush_data_page() {
2825        //tests if the Fallback::flush_data_page clears all buffers correctly
2826        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2827        let values = Arc::new(StringArray::from(raw_values));
2828        let encodings = vec![
2829            Encoding::DELTA_BYTE_ARRAY,
2830            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2831        ];
2832        let data_type = values.data_type().clone();
2833        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2834        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2835
2836        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2837        let data_page_size_limit: usize = 32;
2838        let write_batch_size: usize = 16;
2839
2840        for encoding in &encodings {
2841            for row_group_size in row_group_sizes {
2842                let props = WriterProperties::builder()
2843                    .set_writer_version(WriterVersion::PARQUET_2_0)
2844                    .set_max_row_group_size(row_group_size)
2845                    .set_dictionary_enabled(false)
2846                    .set_encoding(*encoding)
2847                    .set_data_page_size_limit(data_page_size_limit)
2848                    .set_write_batch_size(write_batch_size)
2849                    .build();
2850
2851                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2852                    let string_array_a = StringArray::from(a.clone());
2853                    let string_array_b = StringArray::from(b.clone());
2854                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2855                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2856                    assert_eq!(
2857                        vec_a, vec_b,
2858                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2859                    );
2860                });
2861            }
2862        }
2863    }
2864
2865    #[test]
2866    fn arrow_writer_string_dictionary() {
2867        // define schema
2868        #[allow(deprecated)]
2869        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2870            "dictionary",
2871            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2872            true,
2873            42,
2874            true,
2875        )]));
2876
2877        // create some data
2878        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2879            .iter()
2880            .copied()
2881            .collect();
2882
2883        // build a record batch
2884        one_column_roundtrip_with_schema(Arc::new(d), schema);
2885    }
2886
2887    #[test]
2888    fn arrow_writer_primitive_dictionary() {
2889        // define schema
2890        #[allow(deprecated)]
2891        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2892            "dictionary",
2893            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2894            true,
2895            42,
2896            true,
2897        )]));
2898
2899        // create some data
2900        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2901        builder.append(12345678).unwrap();
2902        builder.append_null();
2903        builder.append(22345678).unwrap();
2904        builder.append(12345678).unwrap();
2905        let d = builder.finish();
2906
2907        one_column_roundtrip_with_schema(Arc::new(d), schema);
2908    }
2909
2910    #[test]
2911    fn arrow_writer_decimal128_dictionary() {
2912        let integers = vec![12345, 56789, 34567];
2913
2914        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2915
2916        let values = Decimal128Array::from(integers.clone())
2917            .with_precision_and_scale(5, 2)
2918            .unwrap();
2919
2920        let array = DictionaryArray::new(keys, Arc::new(values));
2921        one_column_roundtrip(Arc::new(array.clone()), true);
2922
2923        let values = Decimal128Array::from(integers)
2924            .with_precision_and_scale(12, 2)
2925            .unwrap();
2926
2927        let array = array.with_values(Arc::new(values));
2928        one_column_roundtrip(Arc::new(array), true);
2929    }
2930
2931    #[test]
2932    fn arrow_writer_decimal256_dictionary() {
2933        let integers = vec![
2934            i256::from_i128(12345),
2935            i256::from_i128(56789),
2936            i256::from_i128(34567),
2937        ];
2938
2939        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2940
2941        let values = Decimal256Array::from(integers.clone())
2942            .with_precision_and_scale(5, 2)
2943            .unwrap();
2944
2945        let array = DictionaryArray::new(keys, Arc::new(values));
2946        one_column_roundtrip(Arc::new(array.clone()), true);
2947
2948        let values = Decimal256Array::from(integers)
2949            .with_precision_and_scale(12, 2)
2950            .unwrap();
2951
2952        let array = array.with_values(Arc::new(values));
2953        one_column_roundtrip(Arc::new(array), true);
2954    }
2955
2956    #[test]
2957    fn arrow_writer_string_dictionary_unsigned_index() {
2958        // define schema
2959        #[allow(deprecated)]
2960        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2961            "dictionary",
2962            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
2963            true,
2964            42,
2965            true,
2966        )]));
2967
2968        // create some data
2969        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2970            .iter()
2971            .copied()
2972            .collect();
2973
2974        one_column_roundtrip_with_schema(Arc::new(d), schema);
2975    }
2976
2977    #[test]
2978    fn u32_min_max() {
2979        // check values roundtrip through parquet
2980        let src = [
2981            u32::MIN,
2982            u32::MIN + 1,
2983            (i32::MAX as u32) - 1,
2984            i32::MAX as u32,
2985            (i32::MAX as u32) + 1,
2986            u32::MAX - 1,
2987            u32::MAX,
2988        ];
2989        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
2990        let files = one_column_roundtrip(values, false);
2991
2992        for file in files {
2993            // check statistics are valid
2994            let reader = SerializedFileReader::new(file).unwrap();
2995            let metadata = reader.metadata();
2996
2997            let mut row_offset = 0;
2998            for row_group in metadata.row_groups() {
2999                assert_eq!(row_group.num_columns(), 1);
3000                let column = row_group.column(0);
3001
3002                let num_values = column.num_values() as usize;
3003                let src_slice = &src[row_offset..row_offset + num_values];
3004                row_offset += column.num_values() as usize;
3005
3006                let stats = column.statistics().unwrap();
3007                if let Statistics::Int32(stats) = stats {
3008                    assert_eq!(
3009                        *stats.min_opt().unwrap() as u32,
3010                        *src_slice.iter().min().unwrap()
3011                    );
3012                    assert_eq!(
3013                        *stats.max_opt().unwrap() as u32,
3014                        *src_slice.iter().max().unwrap()
3015                    );
3016                } else {
3017                    panic!("Statistics::Int32 missing")
3018                }
3019            }
3020        }
3021    }
3022
3023    #[test]
3024    fn u64_min_max() {
3025        // check values roundtrip through parquet
3026        let src = [
3027            u64::MIN,
3028            u64::MIN + 1,
3029            (i64::MAX as u64) - 1,
3030            i64::MAX as u64,
3031            (i64::MAX as u64) + 1,
3032            u64::MAX - 1,
3033            u64::MAX,
3034        ];
3035        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3036        let files = one_column_roundtrip(values, false);
3037
3038        for file in files {
3039            // check statistics are valid
3040            let reader = SerializedFileReader::new(file).unwrap();
3041            let metadata = reader.metadata();
3042
3043            let mut row_offset = 0;
3044            for row_group in metadata.row_groups() {
3045                assert_eq!(row_group.num_columns(), 1);
3046                let column = row_group.column(0);
3047
3048                let num_values = column.num_values() as usize;
3049                let src_slice = &src[row_offset..row_offset + num_values];
3050                row_offset += column.num_values() as usize;
3051
3052                let stats = column.statistics().unwrap();
3053                if let Statistics::Int64(stats) = stats {
3054                    assert_eq!(
3055                        *stats.min_opt().unwrap() as u64,
3056                        *src_slice.iter().min().unwrap()
3057                    );
3058                    assert_eq!(
3059                        *stats.max_opt().unwrap() as u64,
3060                        *src_slice.iter().max().unwrap()
3061                    );
3062                } else {
3063                    panic!("Statistics::Int64 missing")
3064                }
3065            }
3066        }
3067    }
3068
3069    #[test]
3070    fn statistics_null_counts_only_nulls() {
3071        // check that null-count statistics for "only NULL"-columns are correct
3072        let values = Arc::new(UInt64Array::from(vec![None, None]));
3073        let files = one_column_roundtrip(values, true);
3074
3075        for file in files {
3076            // check statistics are valid
3077            let reader = SerializedFileReader::new(file).unwrap();
3078            let metadata = reader.metadata();
3079            assert_eq!(metadata.num_row_groups(), 1);
3080            let row_group = metadata.row_group(0);
3081            assert_eq!(row_group.num_columns(), 1);
3082            let column = row_group.column(0);
3083            let stats = column.statistics().unwrap();
3084            assert_eq!(stats.null_count_opt(), Some(2));
3085        }
3086    }
3087
3088    #[test]
3089    fn test_list_of_struct_roundtrip() {
3090        // define schema
3091        let int_field = Field::new("a", DataType::Int32, true);
3092        let int_field2 = Field::new("b", DataType::Int32, true);
3093
3094        let int_builder = Int32Builder::with_capacity(10);
3095        let int_builder2 = Int32Builder::with_capacity(10);
3096
3097        let struct_builder = StructBuilder::new(
3098            vec![int_field, int_field2],
3099            vec![Box::new(int_builder), Box::new(int_builder2)],
3100        );
3101        let mut list_builder = ListBuilder::new(struct_builder);
3102
3103        // Construct the following array
3104        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3105
3106        // [{a: 1, b: 2}]
3107        let values = list_builder.values();
3108        values
3109            .field_builder::<Int32Builder>(0)
3110            .unwrap()
3111            .append_value(1);
3112        values
3113            .field_builder::<Int32Builder>(1)
3114            .unwrap()
3115            .append_value(2);
3116        values.append(true);
3117        list_builder.append(true);
3118
3119        // []
3120        list_builder.append(true);
3121
3122        // null
3123        list_builder.append(false);
3124
3125        // [null, null]
3126        let values = list_builder.values();
3127        values
3128            .field_builder::<Int32Builder>(0)
3129            .unwrap()
3130            .append_null();
3131        values
3132            .field_builder::<Int32Builder>(1)
3133            .unwrap()
3134            .append_null();
3135        values.append(false);
3136        values
3137            .field_builder::<Int32Builder>(0)
3138            .unwrap()
3139            .append_null();
3140        values
3141            .field_builder::<Int32Builder>(1)
3142            .unwrap()
3143            .append_null();
3144        values.append(false);
3145        list_builder.append(true);
3146
3147        // [{a: null, b: 3}]
3148        let values = list_builder.values();
3149        values
3150            .field_builder::<Int32Builder>(0)
3151            .unwrap()
3152            .append_null();
3153        values
3154            .field_builder::<Int32Builder>(1)
3155            .unwrap()
3156            .append_value(3);
3157        values.append(true);
3158        list_builder.append(true);
3159
3160        // [{a: 2, b: null}]
3161        let values = list_builder.values();
3162        values
3163            .field_builder::<Int32Builder>(0)
3164            .unwrap()
3165            .append_value(2);
3166        values
3167            .field_builder::<Int32Builder>(1)
3168            .unwrap()
3169            .append_null();
3170        values.append(true);
3171        list_builder.append(true);
3172
3173        let array = Arc::new(list_builder.finish());
3174
3175        one_column_roundtrip(array, true);
3176    }
3177
3178    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3179        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3180    }
3181
3182    #[test]
3183    fn test_aggregates_records() {
3184        let arrays = [
3185            Int32Array::from((0..100).collect::<Vec<_>>()),
3186            Int32Array::from((0..50).collect::<Vec<_>>()),
3187            Int32Array::from((200..500).collect::<Vec<_>>()),
3188        ];
3189
3190        let schema = Arc::new(Schema::new(vec![Field::new(
3191            "int",
3192            ArrowDataType::Int32,
3193            false,
3194        )]));
3195
3196        let file = tempfile::tempfile().unwrap();
3197
3198        let props = WriterProperties::builder()
3199            .set_max_row_group_size(200)
3200            .build();
3201
3202        let mut writer =
3203            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3204
3205        for array in arrays {
3206            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3207            writer.write(&batch).unwrap();
3208        }
3209
3210        writer.close().unwrap();
3211
3212        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3213        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3214
3215        let batches = builder
3216            .with_batch_size(100)
3217            .build()
3218            .unwrap()
3219            .collect::<ArrowResult<Vec<_>>>()
3220            .unwrap();
3221
3222        assert_eq!(batches.len(), 5);
3223        assert!(batches.iter().all(|x| x.num_columns() == 1));
3224
3225        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3226
3227        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3228
3229        let values: Vec<_> = batches
3230            .iter()
3231            .flat_map(|x| {
3232                x.column(0)
3233                    .as_any()
3234                    .downcast_ref::<Int32Array>()
3235                    .unwrap()
3236                    .values()
3237                    .iter()
3238                    .cloned()
3239            })
3240            .collect();
3241
3242        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3243        assert_eq!(&values, &expected_values)
3244    }
3245
3246    #[test]
3247    fn complex_aggregate() {
3248        // Tests aggregating nested data
3249        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3250        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3251        let struct_a = Arc::new(Field::new(
3252            "struct_a",
3253            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3254            true,
3255        ));
3256
3257        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3258        let struct_b = Arc::new(Field::new(
3259            "struct_b",
3260            DataType::Struct(vec![list_a.clone()].into()),
3261            false,
3262        ));
3263
3264        let schema = Arc::new(Schema::new(vec![struct_b]));
3265
3266        // create nested data
3267        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3268        let field_b_array =
3269            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3270
3271        let struct_a_array = StructArray::from(vec![
3272            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3273            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3274        ]);
3275
3276        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3277            .len(5)
3278            .add_buffer(Buffer::from_iter(vec![
3279                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3280            ]))
3281            .null_bit_buffer(Some(Buffer::from_iter(vec![
3282                true, false, true, false, true,
3283            ])))
3284            .child_data(vec![struct_a_array.into_data()])
3285            .build()
3286            .unwrap();
3287
3288        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3289        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3290
3291        let batch1 =
3292            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3293                .unwrap();
3294
3295        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3296        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3297
3298        let struct_a_array = StructArray::from(vec![
3299            (field_a, Arc::new(field_a_array) as ArrayRef),
3300            (field_b, Arc::new(field_b_array) as ArrayRef),
3301        ]);
3302
3303        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3304            .len(2)
3305            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3306            .child_data(vec![struct_a_array.into_data()])
3307            .build()
3308            .unwrap();
3309
3310        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3311        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3312
3313        let batch2 =
3314            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3315                .unwrap();
3316
3317        let batches = &[batch1, batch2];
3318
3319        // Verify data is as expected
3320
3321        let expected = r#"
3322            +-------------------------------------------------------------------------------------------------------+
3323            | struct_b                                                                                              |
3324            +-------------------------------------------------------------------------------------------------------+
3325            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3326            | {list: }                                                                                              |
3327            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3328            | {list: }                                                                                              |
3329            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3330            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3331            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3332            +-------------------------------------------------------------------------------------------------------+
3333        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3334
3335        let actual = pretty_format_batches(batches).unwrap().to_string();
3336        assert_eq!(actual, expected);
3337
3338        // Write data
3339        let file = tempfile::tempfile().unwrap();
3340        let props = WriterProperties::builder()
3341            .set_max_row_group_size(6)
3342            .build();
3343
3344        let mut writer =
3345            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3346
3347        for batch in batches {
3348            writer.write(batch).unwrap();
3349        }
3350        writer.close().unwrap();
3351
3352        // Read Data
3353        // Should have written entire first batch and first row of second to the first row group
3354        // leaving a single row in the second row group
3355
3356        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3357        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3358
3359        let batches = builder
3360            .with_batch_size(2)
3361            .build()
3362            .unwrap()
3363            .collect::<ArrowResult<Vec<_>>>()
3364            .unwrap();
3365
3366        assert_eq!(batches.len(), 4);
3367        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3368        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3369
3370        let actual = pretty_format_batches(&batches).unwrap().to_string();
3371        assert_eq!(actual, expected);
3372    }
3373
3374    #[test]
3375    fn test_arrow_writer_metadata() {
3376        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3377        let file_schema = batch_schema.clone().with_metadata(
3378            vec![("foo".to_string(), "bar".to_string())]
3379                .into_iter()
3380                .collect(),
3381        );
3382
3383        let batch = RecordBatch::try_new(
3384            Arc::new(batch_schema),
3385            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3386        )
3387        .unwrap();
3388
3389        let mut buf = Vec::with_capacity(1024);
3390        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3391        writer.write(&batch).unwrap();
3392        writer.close().unwrap();
3393    }
3394
3395    #[test]
3396    fn test_arrow_writer_nullable() {
3397        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3398        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3399        let file_schema = Arc::new(file_schema);
3400
3401        let batch = RecordBatch::try_new(
3402            Arc::new(batch_schema),
3403            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3404        )
3405        .unwrap();
3406
3407        let mut buf = Vec::with_capacity(1024);
3408        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3409        writer.write(&batch).unwrap();
3410        writer.close().unwrap();
3411
3412        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3413        let back = read.next().unwrap().unwrap();
3414        assert_eq!(back.schema(), file_schema);
3415        assert_ne!(back.schema(), batch.schema());
3416        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3417    }
3418
3419    #[test]
3420    fn in_progress_accounting() {
3421        // define schema
3422        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3423
3424        // create some data
3425        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3426
3427        // build a record batch
3428        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3429
3430        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3431
3432        // starts empty
3433        assert_eq!(writer.in_progress_size(), 0);
3434        assert_eq!(writer.in_progress_rows(), 0);
3435        assert_eq!(writer.memory_size(), 0);
3436        assert_eq!(writer.bytes_written(), 4); // Initial header
3437        writer.write(&batch).unwrap();
3438
3439        // updated on write
3440        let initial_size = writer.in_progress_size();
3441        assert!(initial_size > 0);
3442        assert_eq!(writer.in_progress_rows(), 5);
3443        let initial_memory = writer.memory_size();
3444        assert!(initial_memory > 0);
3445        // memory estimate is larger than estimated encoded size
3446        assert!(
3447            initial_size <= initial_memory,
3448            "{initial_size} <= {initial_memory}"
3449        );
3450
3451        // updated on second write
3452        writer.write(&batch).unwrap();
3453        assert!(writer.in_progress_size() > initial_size);
3454        assert_eq!(writer.in_progress_rows(), 10);
3455        assert!(writer.memory_size() > initial_memory);
3456        assert!(
3457            writer.in_progress_size() <= writer.memory_size(),
3458            "in_progress_size {} <= memory_size {}",
3459            writer.in_progress_size(),
3460            writer.memory_size()
3461        );
3462
3463        // in progress tracking is cleared, but the overall data written is updated
3464        let pre_flush_bytes_written = writer.bytes_written();
3465        writer.flush().unwrap();
3466        assert_eq!(writer.in_progress_size(), 0);
3467        assert_eq!(writer.memory_size(), 0);
3468        assert!(writer.bytes_written() > pre_flush_bytes_written);
3469
3470        writer.close().unwrap();
3471    }
3472
3473    #[test]
3474    fn test_writer_all_null() {
3475        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3476        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3477        let batch = RecordBatch::try_from_iter(vec![
3478            ("a", Arc::new(a) as ArrayRef),
3479            ("b", Arc::new(b) as ArrayRef),
3480        ])
3481        .unwrap();
3482
3483        let mut buf = Vec::with_capacity(1024);
3484        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3485        writer.write(&batch).unwrap();
3486        writer.close().unwrap();
3487
3488        let bytes = Bytes::from(buf);
3489        let options = ReadOptionsBuilder::new().with_page_index().build();
3490        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3491        let index = reader.metadata().offset_index().unwrap();
3492
3493        assert_eq!(index.len(), 1);
3494        assert_eq!(index[0].len(), 2); // 2 columns
3495        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3496        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3497    }
3498
3499    #[test]
3500    fn test_disabled_statistics_with_page() {
3501        let file_schema = Schema::new(vec![
3502            Field::new("a", DataType::Utf8, true),
3503            Field::new("b", DataType::Utf8, true),
3504        ]);
3505        let file_schema = Arc::new(file_schema);
3506
3507        let batch = RecordBatch::try_new(
3508            file_schema.clone(),
3509            vec![
3510                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3511                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3512            ],
3513        )
3514        .unwrap();
3515
3516        let props = WriterProperties::builder()
3517            .set_statistics_enabled(EnabledStatistics::None)
3518            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3519            .build();
3520
3521        let mut buf = Vec::with_capacity(1024);
3522        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3523        writer.write(&batch).unwrap();
3524
3525        let metadata = writer.close().unwrap();
3526        assert_eq!(metadata.row_groups.len(), 1);
3527        let row_group = &metadata.row_groups[0];
3528        assert_eq!(row_group.columns.len(), 2);
3529        // Column "a" has both offset and column index, as requested
3530        assert!(row_group.columns[0].offset_index_offset.is_some());
3531        assert!(row_group.columns[0].column_index_offset.is_some());
3532        // Column "b" should only have offset index
3533        assert!(row_group.columns[1].offset_index_offset.is_some());
3534        assert!(row_group.columns[1].column_index_offset.is_none());
3535
3536        let options = ReadOptionsBuilder::new().with_page_index().build();
3537        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3538
3539        let row_group = reader.get_row_group(0).unwrap();
3540        let a_col = row_group.metadata().column(0);
3541        let b_col = row_group.metadata().column(1);
3542
3543        // Column chunk of column "a" should have chunk level statistics
3544        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3545            let min = byte_array_stats.min_opt().unwrap();
3546            let max = byte_array_stats.max_opt().unwrap();
3547
3548            assert_eq!(min.as_bytes(), b"a");
3549            assert_eq!(max.as_bytes(), b"d");
3550        } else {
3551            panic!("expecting Statistics::ByteArray");
3552        }
3553
3554        // The column chunk for column "b" shouldn't have statistics
3555        assert!(b_col.statistics().is_none());
3556
3557        let offset_index = reader.metadata().offset_index().unwrap();
3558        assert_eq!(offset_index.len(), 1); // 1 row group
3559        assert_eq!(offset_index[0].len(), 2); // 2 columns
3560
3561        let column_index = reader.metadata().column_index().unwrap();
3562        assert_eq!(column_index.len(), 1); // 1 row group
3563        assert_eq!(column_index[0].len(), 2); // 2 columns
3564
3565        let a_idx = &column_index[0][0];
3566        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3567        let b_idx = &column_index[0][1];
3568        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3569    }
3570
3571    #[test]
3572    fn test_disabled_statistics_with_chunk() {
3573        let file_schema = Schema::new(vec![
3574            Field::new("a", DataType::Utf8, true),
3575            Field::new("b", DataType::Utf8, true),
3576        ]);
3577        let file_schema = Arc::new(file_schema);
3578
3579        let batch = RecordBatch::try_new(
3580            file_schema.clone(),
3581            vec![
3582                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3583                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3584            ],
3585        )
3586        .unwrap();
3587
3588        let props = WriterProperties::builder()
3589            .set_statistics_enabled(EnabledStatistics::None)
3590            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3591            .build();
3592
3593        let mut buf = Vec::with_capacity(1024);
3594        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3595        writer.write(&batch).unwrap();
3596
3597        let metadata = writer.close().unwrap();
3598        assert_eq!(metadata.row_groups.len(), 1);
3599        let row_group = &metadata.row_groups[0];
3600        assert_eq!(row_group.columns.len(), 2);
3601        // Column "a" should only have offset index
3602        assert!(row_group.columns[0].offset_index_offset.is_some());
3603        assert!(row_group.columns[0].column_index_offset.is_none());
3604        // Column "b" should only have offset index
3605        assert!(row_group.columns[1].offset_index_offset.is_some());
3606        assert!(row_group.columns[1].column_index_offset.is_none());
3607
3608        let options = ReadOptionsBuilder::new().with_page_index().build();
3609        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3610
3611        let row_group = reader.get_row_group(0).unwrap();
3612        let a_col = row_group.metadata().column(0);
3613        let b_col = row_group.metadata().column(1);
3614
3615        // Column chunk of column "a" should have chunk level statistics
3616        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3617            let min = byte_array_stats.min_opt().unwrap();
3618            let max = byte_array_stats.max_opt().unwrap();
3619
3620            assert_eq!(min.as_bytes(), b"a");
3621            assert_eq!(max.as_bytes(), b"d");
3622        } else {
3623            panic!("expecting Statistics::ByteArray");
3624        }
3625
3626        // The column chunk for column "b"  shouldn't have statistics
3627        assert!(b_col.statistics().is_none());
3628
3629        let column_index = reader.metadata().column_index().unwrap();
3630        assert_eq!(column_index.len(), 1); // 1 row group
3631        assert_eq!(column_index[0].len(), 2); // 2 columns
3632
3633        let a_idx = &column_index[0][0];
3634        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3635        let b_idx = &column_index[0][1];
3636        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3637    }
3638
3639    #[test]
3640    fn test_arrow_writer_skip_metadata() {
3641        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3642        let file_schema = Arc::new(batch_schema.clone());
3643
3644        let batch = RecordBatch::try_new(
3645            Arc::new(batch_schema),
3646            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3647        )
3648        .unwrap();
3649        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3650
3651        let mut buf = Vec::with_capacity(1024);
3652        let mut writer =
3653            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3654        writer.write(&batch).unwrap();
3655        writer.close().unwrap();
3656
3657        let bytes = Bytes::from(buf);
3658        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3659        assert_eq!(file_schema, *reader_builder.schema());
3660        if let Some(key_value_metadata) = reader_builder
3661            .metadata()
3662            .file_metadata()
3663            .key_value_metadata()
3664        {
3665            assert!(!key_value_metadata
3666                .iter()
3667                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3668        }
3669    }
3670
3671    #[test]
3672    fn mismatched_schemas() {
3673        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3674        let file_schema = Arc::new(Schema::new(vec![Field::new(
3675            "temperature",
3676            DataType::Float64,
3677            false,
3678        )]));
3679
3680        let batch = RecordBatch::try_new(
3681            Arc::new(batch_schema),
3682            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3683        )
3684        .unwrap();
3685
3686        let mut buf = Vec::with_capacity(1024);
3687        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3688
3689        let err = writer.write(&batch).unwrap_err().to_string();
3690        assert_eq!(
3691            err,
3692            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3693        );
3694    }
3695
3696    #[test]
3697    // https://github.com/apache/arrow-rs/issues/6988
3698    fn test_roundtrip_empty_schema() {
3699        // create empty record batch with empty schema
3700        let empty_batch = RecordBatch::try_new_with_options(
3701            Arc::new(Schema::empty()),
3702            vec![],
3703            &RecordBatchOptions::default().with_row_count(Some(0)),
3704        )
3705        .unwrap();
3706
3707        // write to parquet
3708        let mut parquet_bytes: Vec<u8> = Vec::new();
3709        let mut writer =
3710            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3711        writer.write(&empty_batch).unwrap();
3712        writer.close().unwrap();
3713
3714        // read from parquet
3715        let bytes = Bytes::from(parquet_bytes);
3716        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3717        assert_eq!(reader.schema(), &empty_batch.schema());
3718        let batches: Vec<_> = reader
3719            .build()
3720            .unwrap()
3721            .collect::<ArrowResult<Vec<_>>>()
3722            .unwrap();
3723        assert_eq!(batches.len(), 0);
3724    }
3725}