parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58/// Encodes [`RecordBatch`] to parquet
59///
60/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
61/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
62/// flushed on close, leading the final row group in the output file to potentially
63/// contain fewer than `max_row_group_size` rows
64///
65/// # Example: Writing `RecordBatch`es
66/// ```
67/// # use std::sync::Arc;
68/// # use bytes::Bytes;
69/// # use arrow_array::{ArrayRef, Int64Array};
70/// # use arrow_array::RecordBatch;
71/// # use parquet::arrow::arrow_writer::ArrowWriter;
72/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
73/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
74/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
75///
76/// let mut buffer = Vec::new();
77/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
78/// writer.write(&to_write).unwrap();
79/// writer.close().unwrap();
80///
81/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
82/// let read = reader.next().unwrap().unwrap();
83///
84/// assert_eq!(to_write, read);
85/// ```
86///
87/// # Memory Usage and Limiting
88///
89/// The nature of Parquet requires buffering of an entire row group before it can
90/// be flushed to the underlying writer. Data is mostly buffered in its encoded
91/// form, reducing memory usage. However, some data such as dictionary keys,
92/// large strings or very nested data may still result in non-trivial memory
93/// usage.
94///
95/// See Also:
96/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
97/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
98///
99/// Call [`Self::flush`] to trigger an early flush of a row group based on a
100/// memory threshold and/or global memory pressure. However,  smaller row groups
101/// result in higher metadata overheads, and thus may worsen compression ratios
102/// and query performance.
103///
104/// ```no_run
105/// # use std::io::Write;
106/// # use arrow_array::RecordBatch;
107/// # use parquet::arrow::ArrowWriter;
108/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
109/// # let batch: RecordBatch = todo!();
110/// writer.write(&batch).unwrap();
111/// // Trigger an early flush if anticipated size exceeds 1_000_000
112/// if writer.in_progress_size() > 1_000_000 {
113///     writer.flush().unwrap();
114/// }
115/// ```
116///
117/// ## Type Support
118///
119/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
120/// Parquet types including  [`StructArray`] and [`ListArray`].
121///
122/// The following are not supported:
123///
124/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
125///
126/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
127/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
128/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
129/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
130/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
131pub struct ArrowWriter<W: Write> {
132    /// Underlying Parquet writer
133    writer: SerializedFileWriter<W>,
134
135    /// The in-progress row group if any
136    in_progress: Option<ArrowRowGroupWriter>,
137
138    /// A copy of the Arrow schema.
139    ///
140    /// The schema is used to verify that each record batch written has the correct schema
141    arrow_schema: SchemaRef,
142
143    /// Creates new [`ArrowRowGroupWriter`] instances as required
144    row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146    /// The length of arrays to write to each row group
147    max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        let buffered_memory = self.in_progress_size();
153        f.debug_struct("ArrowWriter")
154            .field("writer", &self.writer)
155            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156            .field("in_progress_rows", &self.in_progress_rows())
157            .field("arrow_schema", &self.arrow_schema)
158            .field("max_row_group_size", &self.max_row_group_size)
159            .finish()
160    }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164    /// Try to create a new Arrow writer
165    ///
166    /// The writer will fail if:
167    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
168    ///  * the Arrow schema contains unsupported datatypes such as Unions
169    pub fn try_new(
170        writer: W,
171        arrow_schema: SchemaRef,
172        props: Option<WriterProperties>,
173    ) -> Result<Self> {
174        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175        Self::try_new_with_options(writer, arrow_schema, options)
176    }
177
178    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
179    ///
180    /// The writer will fail if:
181    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
182    ///  * the Arrow schema contains unsupported datatypes such as Unions
183    pub fn try_new_with_options(
184        writer: W,
185        arrow_schema: SchemaRef,
186        options: ArrowWriterOptions,
187    ) -> Result<Self> {
188        let mut props = options.properties;
189        let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190        if let Some(schema_root) = &options.schema_root {
191            converter = converter.schema_root(schema_root);
192        }
193        let schema = converter.convert(&arrow_schema)?;
194        if !options.skip_arrow_metadata {
195            // add serialized arrow schema
196            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197        }
198
199        let max_row_group_size = props.max_row_group_size();
200
201        let file_writer =
202            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204        let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206        Ok(Self {
207            writer: file_writer,
208            in_progress: None,
209            arrow_schema,
210            row_group_writer_factory,
211            max_row_group_size,
212        })
213    }
214
215    /// Returns metadata for any flushed row groups
216    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217        self.writer.flushed_row_groups()
218    }
219
220    /// Estimated memory usage, in bytes, of this `ArrowWriter`
221    ///
222    /// This estimate is formed bu summing the values of
223    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
224    pub fn memory_size(&self) -> usize {
225        match &self.in_progress {
226            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227            None => 0,
228        }
229    }
230
231    /// Anticipated encoded size of the in progress row group.
232    ///
233    /// This estimate the row group size after being completely encoded is,
234    /// formed by summing the values of
235    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
236    /// columns.
237    pub fn in_progress_size(&self) -> usize {
238        match &self.in_progress {
239            Some(in_progress) => in_progress
240                .writers
241                .iter()
242                .map(|x| x.get_estimated_total_bytes())
243                .sum(),
244            None => 0,
245        }
246    }
247
248    /// Returns the number of rows buffered in the in progress row group
249    pub fn in_progress_rows(&self) -> usize {
250        self.in_progress
251            .as_ref()
252            .map(|x| x.buffered_rows)
253            .unwrap_or_default()
254    }
255
256    /// Returns the number of bytes written by this instance
257    pub fn bytes_written(&self) -> usize {
258        self.writer.bytes_written()
259    }
260
261    /// Encodes the provided [`RecordBatch`]
262    ///
263    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
264    /// rows, the contents of `batch` will be written to one or more row groups such that all but
265    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
266    ///
267    /// This will fail if the `batch`'s schema does not match the writer's schema.
268    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269        if batch.num_rows() == 0 {
270            return Ok(());
271        }
272
273        let in_progress = match &mut self.in_progress {
274            Some(in_progress) => in_progress,
275            x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276                self.writer.schema_descr(),
277                self.writer.properties(),
278                &self.arrow_schema,
279                self.writer.flushed_row_groups().len(),
280            )?),
281        };
282
283        // If would exceed max_row_group_size, split batch
284        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285            let to_write = self.max_row_group_size - in_progress.buffered_rows;
286            let a = batch.slice(0, to_write);
287            let b = batch.slice(to_write, batch.num_rows() - to_write);
288            self.write(&a)?;
289            return self.write(&b);
290        }
291
292        in_progress.write(batch)?;
293
294        if in_progress.buffered_rows >= self.max_row_group_size {
295            self.flush()?
296        }
297        Ok(())
298    }
299
300    /// Flushes all buffered rows into a new row group
301    pub fn flush(&mut self) -> Result<()> {
302        let in_progress = match self.in_progress.take() {
303            Some(in_progress) => in_progress,
304            None => return Ok(()),
305        };
306
307        let mut row_group_writer = self.writer.next_row_group()?;
308        for chunk in in_progress.close()? {
309            chunk.append_to_row_group(&mut row_group_writer)?;
310        }
311        row_group_writer.close()?;
312        Ok(())
313    }
314
315    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
316    ///
317    /// This method provide a way to append kv_metadata after write RecordBatch
318    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
319        self.writer.append_key_value_metadata(kv_metadata)
320    }
321
322    /// Returns a reference to the underlying writer.
323    pub fn inner(&self) -> &W {
324        self.writer.inner()
325    }
326
327    /// Returns a mutable reference to the underlying writer.
328    ///
329    /// It is inadvisable to directly write to the underlying writer, doing so
330    /// will likely result in a corrupt parquet file
331    pub fn inner_mut(&mut self) -> &mut W {
332        self.writer.inner_mut()
333    }
334
335    /// Flushes any outstanding data and returns the underlying writer.
336    pub fn into_inner(mut self) -> Result<W> {
337        self.flush()?;
338        self.writer.into_inner()
339    }
340
341    /// Close and finalize the underlying Parquet writer
342    ///
343    /// Unlike [`Self::close`] this does not consume self
344    ///
345    /// Attempting to write after calling finish will result in an error
346    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
347        self.flush()?;
348        self.writer.finish()
349    }
350
351    /// Close and finalize the underlying Parquet writer
352    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
353        self.finish()
354    }
355}
356
357impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
358    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
359        self.write(batch).map_err(|e| e.into())
360    }
361
362    fn close(self) -> std::result::Result<(), ArrowError> {
363        self.close()?;
364        Ok(())
365    }
366}
367
368/// Arrow-specific configuration settings for writing parquet files.
369///
370/// See [`ArrowWriter`] for how to configure the writer.
371#[derive(Debug, Clone, Default)]
372pub struct ArrowWriterOptions {
373    properties: WriterProperties,
374    skip_arrow_metadata: bool,
375    schema_root: Option<String>,
376}
377
378impl ArrowWriterOptions {
379    /// Creates a new [`ArrowWriterOptions`] with the default settings.
380    pub fn new() -> Self {
381        Self::default()
382    }
383
384    /// Sets the [`WriterProperties`] for writing parquet files.
385    pub fn with_properties(self, properties: WriterProperties) -> Self {
386        Self { properties, ..self }
387    }
388
389    /// Skip encoding the embedded arrow metadata (defaults to `false`)
390    ///
391    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
392    /// by default.
393    ///
394    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
395    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
396        Self {
397            skip_arrow_metadata,
398            ..self
399        }
400    }
401
402    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
403    pub fn with_schema_root(self, schema_root: String) -> Self {
404        Self {
405            schema_root: Some(schema_root),
406            ..self
407        }
408    }
409}
410
411/// A single column chunk produced by [`ArrowColumnWriter`]
412#[derive(Default)]
413struct ArrowColumnChunkData {
414    length: usize,
415    data: Vec<Bytes>,
416}
417
418impl Length for ArrowColumnChunkData {
419    fn len(&self) -> u64 {
420        self.length as _
421    }
422}
423
424impl ChunkReader for ArrowColumnChunkData {
425    type T = ArrowColumnChunkReader;
426
427    fn get_read(&self, start: u64) -> Result<Self::T> {
428        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
429        Ok(ArrowColumnChunkReader(
430            self.data.clone().into_iter().peekable(),
431        ))
432    }
433
434    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
435        unimplemented!()
436    }
437}
438
439/// A [`Read`] for [`ArrowColumnChunkData`]
440struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
441
442impl Read for ArrowColumnChunkReader {
443    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
444        let buffer = loop {
445            match self.0.peek_mut() {
446                Some(b) if b.is_empty() => {
447                    self.0.next();
448                    continue;
449                }
450                Some(b) => break b,
451                None => return Ok(0),
452            }
453        };
454
455        let len = buffer.len().min(out.len());
456        let b = buffer.split_to(len);
457        out[..len].copy_from_slice(&b);
458        Ok(len)
459    }
460}
461
462/// A shared [`ArrowColumnChunkData`]
463///
464/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
465/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
466type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
467
468#[derive(Default)]
469struct ArrowPageWriter {
470    buffer: SharedColumnChunk,
471    #[cfg(feature = "encryption")]
472    page_encryptor: Option<PageEncryptor>,
473}
474
475impl ArrowPageWriter {
476    #[cfg(feature = "encryption")]
477    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
478        self.page_encryptor = page_encryptor;
479        self
480    }
481
482    #[cfg(feature = "encryption")]
483    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
484        self.page_encryptor.as_mut()
485    }
486
487    #[cfg(not(feature = "encryption"))]
488    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
489        None
490    }
491}
492
493impl PageWriter for ArrowPageWriter {
494    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
495        let page = match self.page_encryptor_mut() {
496            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
497            None => page,
498        };
499
500        let page_header = page.to_thrift_header();
501        let header = {
502            let mut header = Vec::with_capacity(1024);
503
504            match self.page_encryptor_mut() {
505                Some(page_encryptor) => {
506                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
507                    if page.compressed_page().is_data_page() {
508                        page_encryptor.increment_page();
509                    }
510                }
511                None => {
512                    let mut protocol = TCompactOutputProtocol::new(&mut header);
513                    page_header.write_to_out_protocol(&mut protocol)?;
514                }
515            };
516
517            Bytes::from(header)
518        };
519
520        let mut buf = self.buffer.try_lock().unwrap();
521
522        let data = page.compressed_page().buffer().clone();
523        let compressed_size = data.len() + header.len();
524
525        let mut spec = PageWriteSpec::new();
526        spec.page_type = page.page_type();
527        spec.num_values = page.num_values();
528        spec.uncompressed_size = page.uncompressed_size() + header.len();
529        spec.offset = buf.length as u64;
530        spec.compressed_size = compressed_size;
531        spec.bytes_written = compressed_size as u64;
532
533        buf.length += compressed_size;
534        buf.data.push(header);
535        buf.data.push(data);
536
537        Ok(spec)
538    }
539
540    fn close(&mut self) -> Result<()> {
541        Ok(())
542    }
543}
544
545/// A leaf column that can be encoded by [`ArrowColumnWriter`]
546#[derive(Debug)]
547pub struct ArrowLeafColumn(ArrayLevels);
548
549/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
550pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
551    let levels = calculate_array_levels(array, field)?;
552    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
553}
554
555/// The data for a single column chunk, see [`ArrowColumnWriter`]
556pub struct ArrowColumnChunk {
557    data: ArrowColumnChunkData,
558    close: ColumnCloseResult,
559}
560
561impl std::fmt::Debug for ArrowColumnChunk {
562    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563        f.debug_struct("ArrowColumnChunk")
564            .field("length", &self.data.length)
565            .finish_non_exhaustive()
566    }
567}
568
569impl ArrowColumnChunk {
570    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
571    pub fn append_to_row_group<W: Write + Send>(
572        self,
573        writer: &mut SerializedRowGroupWriter<'_, W>,
574    ) -> Result<()> {
575        writer.append_column(&self.data, self.close)
576    }
577}
578
579/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
580///
581/// Note: This is a low-level interface for applications that require
582/// fine-grained control of encoding (e.g. encoding using multiple threads),
583/// see [`ArrowWriter`] for a higher-level interface
584///
585/// # Example: Encoding two Arrow Array's in Parallel
586/// ```
587/// // The arrow schema
588/// # use std::sync::Arc;
589/// # use arrow_array::*;
590/// # use arrow_schema::*;
591/// # use parquet::arrow::ArrowSchemaConverter;
592/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
593/// # use parquet::file::properties::WriterProperties;
594/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
595/// #
596/// let schema = Arc::new(Schema::new(vec![
597///     Field::new("i32", DataType::Int32, false),
598///     Field::new("f32", DataType::Float32, false),
599/// ]));
600///
601/// // Compute the parquet schema
602/// let props = Arc::new(WriterProperties::default());
603/// let parquet_schema = ArrowSchemaConverter::new()
604///   .with_coerce_types(props.coerce_types())
605///   .convert(&schema)
606///   .unwrap();
607///
608/// // Create writers for each of the leaf columns
609/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
610///
611/// // Spawn a worker thread for each column
612/// //
613/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
614/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
615/// let mut workers: Vec<_> = col_writers
616///     .into_iter()
617///     .map(|mut col_writer| {
618///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
619///         let handle = std::thread::spawn(move || {
620///             // receive Arrays to encode via the channel
621///             for col in recv {
622///                 col_writer.write(&col)?;
623///             }
624///             // once the input is complete, close the writer
625///             // to return the newly created ArrowColumnChunk
626///             col_writer.close()
627///         });
628///         (handle, send)
629///     })
630///     .collect();
631///
632/// // Create parquet writer
633/// let root_schema = parquet_schema.root_schema_ptr();
634/// // write to memory in the example, but this could be a File
635/// let mut out = Vec::with_capacity(1024);
636/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
637///   .unwrap();
638///
639/// // Start row group
640/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
641///   .next_row_group()
642///   .unwrap();
643///
644/// // Create some example input columns to encode
645/// let to_write = vec![
646///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
647///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
648/// ];
649///
650/// // Send the input columns to the workers
651/// let mut worker_iter = workers.iter_mut();
652/// for (arr, field) in to_write.iter().zip(&schema.fields) {
653///     for leaves in compute_leaves(field, arr).unwrap() {
654///         worker_iter.next().unwrap().1.send(leaves).unwrap();
655///     }
656/// }
657///
658/// // Wait for the workers to complete encoding, and append
659/// // the resulting column chunks to the row group (and the file)
660/// for (handle, send) in workers {
661///     drop(send); // Drop send side to signal termination
662///     // wait for the worker to send the completed chunk
663///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
664///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
665/// }
666/// // Close the row group which writes to the underlying file
667/// row_group_writer.close().unwrap();
668///
669/// let metadata = writer.close().unwrap();
670/// assert_eq!(metadata.num_rows, 3);
671/// ```
672pub struct ArrowColumnWriter {
673    writer: ArrowColumnWriterImpl,
674    chunk: SharedColumnChunk,
675}
676
677impl std::fmt::Debug for ArrowColumnWriter {
678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
680    }
681}
682
683enum ArrowColumnWriterImpl {
684    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
685    Column(ColumnWriter<'static>),
686}
687
688impl ArrowColumnWriter {
689    /// Write an [`ArrowLeafColumn`]
690    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
691        match &mut self.writer {
692            ArrowColumnWriterImpl::Column(c) => {
693                write_leaf(c, &col.0)?;
694            }
695            ArrowColumnWriterImpl::ByteArray(c) => {
696                write_primitive(c, col.0.array().as_ref(), &col.0)?;
697            }
698        }
699        Ok(())
700    }
701
702    /// Close this column returning the written [`ArrowColumnChunk`]
703    pub fn close(self) -> Result<ArrowColumnChunk> {
704        let close = match self.writer {
705            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
706            ArrowColumnWriterImpl::Column(c) => c.close()?,
707        };
708        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
709        let data = chunk.into_inner().unwrap();
710        Ok(ArrowColumnChunk { data, close })
711    }
712
713    /// Returns the estimated total memory usage by the writer.
714    ///
715    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
716    /// of the current memory usage and not it's anticipated encoded size.
717    ///
718    /// This includes:
719    /// 1. Data buffered in encoded form
720    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
721    ///
722    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
723    pub fn memory_size(&self) -> usize {
724        match &self.writer {
725            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
726            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
727        }
728    }
729
730    /// Returns the estimated total encoded bytes for this column writer.
731    ///
732    /// This includes:
733    /// 1. Data buffered in encoded form
734    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
735    ///
736    /// This value should be less than or equal to [`Self::memory_size`]
737    pub fn get_estimated_total_bytes(&self) -> usize {
738        match &self.writer {
739            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
740            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
741        }
742    }
743}
744
745/// Encodes [`RecordBatch`] to a parquet row group
746struct ArrowRowGroupWriter {
747    writers: Vec<ArrowColumnWriter>,
748    schema: SchemaRef,
749    buffered_rows: usize,
750}
751
752impl ArrowRowGroupWriter {
753    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
754        Self {
755            writers,
756            schema: arrow.clone(),
757            buffered_rows: 0,
758        }
759    }
760
761    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
762        self.buffered_rows += batch.num_rows();
763        let mut writers = self.writers.iter_mut();
764        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
765            for leaf in compute_leaves(field.as_ref(), column)? {
766                writers.next().unwrap().write(&leaf)?
767            }
768        }
769        Ok(())
770    }
771
772    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
773        self.writers
774            .into_iter()
775            .map(|writer| writer.close())
776            .collect()
777    }
778}
779
780struct ArrowRowGroupWriterFactory {
781    #[cfg(feature = "encryption")]
782    file_encryptor: Option<Arc<FileEncryptor>>,
783}
784
785impl ArrowRowGroupWriterFactory {
786    #[cfg(feature = "encryption")]
787    fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
788        Self {
789            file_encryptor: file_writer.file_encryptor(),
790        }
791    }
792
793    #[cfg(not(feature = "encryption"))]
794    fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
795        Self {}
796    }
797
798    #[cfg(feature = "encryption")]
799    fn create_row_group_writer(
800        &self,
801        parquet: &SchemaDescriptor,
802        props: &WriterPropertiesPtr,
803        arrow: &SchemaRef,
804        row_group_index: usize,
805    ) -> Result<ArrowRowGroupWriter> {
806        let writers = get_column_writers_with_encryptor(
807            parquet,
808            props,
809            arrow,
810            self.file_encryptor.clone(),
811            row_group_index,
812        )?;
813        Ok(ArrowRowGroupWriter::new(writers, arrow))
814    }
815
816    #[cfg(not(feature = "encryption"))]
817    fn create_row_group_writer(
818        &self,
819        parquet: &SchemaDescriptor,
820        props: &WriterPropertiesPtr,
821        arrow: &SchemaRef,
822        _row_group_index: usize,
823    ) -> Result<ArrowRowGroupWriter> {
824        let writers = get_column_writers(parquet, props, arrow)?;
825        Ok(ArrowRowGroupWriter::new(writers, arrow))
826    }
827}
828
829/// Returns the [`ArrowColumnWriter`] for a given schema
830pub fn get_column_writers(
831    parquet: &SchemaDescriptor,
832    props: &WriterPropertiesPtr,
833    arrow: &SchemaRef,
834) -> Result<Vec<ArrowColumnWriter>> {
835    let mut writers = Vec::with_capacity(arrow.fields.len());
836    let mut leaves = parquet.columns().iter();
837    let column_factory = ArrowColumnWriterFactory::new();
838    for field in &arrow.fields {
839        column_factory.get_arrow_column_writer(
840            field.data_type(),
841            props,
842            &mut leaves,
843            &mut writers,
844        )?;
845    }
846    Ok(writers)
847}
848
849/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
850#[cfg(feature = "encryption")]
851fn get_column_writers_with_encryptor(
852    parquet: &SchemaDescriptor,
853    props: &WriterPropertiesPtr,
854    arrow: &SchemaRef,
855    file_encryptor: Option<Arc<FileEncryptor>>,
856    row_group_index: usize,
857) -> Result<Vec<ArrowColumnWriter>> {
858    let mut writers = Vec::with_capacity(arrow.fields.len());
859    let mut leaves = parquet.columns().iter();
860    let column_factory =
861        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
862    for field in &arrow.fields {
863        column_factory.get_arrow_column_writer(
864            field.data_type(),
865            props,
866            &mut leaves,
867            &mut writers,
868        )?;
869    }
870    Ok(writers)
871}
872
873/// Gets [`ArrowColumnWriter`] instances for different data types
874struct ArrowColumnWriterFactory {
875    #[cfg(feature = "encryption")]
876    row_group_index: usize,
877    #[cfg(feature = "encryption")]
878    file_encryptor: Option<Arc<FileEncryptor>>,
879}
880
881impl ArrowColumnWriterFactory {
882    pub fn new() -> Self {
883        Self {
884            #[cfg(feature = "encryption")]
885            row_group_index: 0,
886            #[cfg(feature = "encryption")]
887            file_encryptor: None,
888        }
889    }
890
891    #[cfg(feature = "encryption")]
892    pub fn with_file_encryptor(
893        mut self,
894        row_group_index: usize,
895        file_encryptor: Option<Arc<FileEncryptor>>,
896    ) -> Self {
897        self.row_group_index = row_group_index;
898        self.file_encryptor = file_encryptor;
899        self
900    }
901
902    #[cfg(feature = "encryption")]
903    fn create_page_writer(
904        &self,
905        column_descriptor: &ColumnDescPtr,
906        column_index: usize,
907    ) -> Result<Box<ArrowPageWriter>> {
908        let column_path = column_descriptor.path().string();
909        let page_encryptor = PageEncryptor::create_if_column_encrypted(
910            &self.file_encryptor,
911            self.row_group_index,
912            column_index,
913            &column_path,
914        )?;
915        Ok(Box::new(
916            ArrowPageWriter::default().with_encryptor(page_encryptor),
917        ))
918    }
919
920    #[cfg(not(feature = "encryption"))]
921    fn create_page_writer(
922        &self,
923        _column_descriptor: &ColumnDescPtr,
924        _column_index: usize,
925    ) -> Result<Box<ArrowPageWriter>> {
926        Ok(Box::<ArrowPageWriter>::default())
927    }
928
929    /// Gets the [`ArrowColumnWriter`] for the given `data_type`
930    fn get_arrow_column_writer(
931        &self,
932        data_type: &ArrowDataType,
933        props: &WriterPropertiesPtr,
934        leaves: &mut Iter<'_, ColumnDescPtr>,
935        out: &mut Vec<ArrowColumnWriter>,
936    ) -> Result<()> {
937        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
938            let page_writer = self.create_page_writer(desc, out.len())?;
939            let chunk = page_writer.buffer.clone();
940            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
941            Ok(ArrowColumnWriter {
942                chunk,
943                writer: ArrowColumnWriterImpl::Column(writer),
944            })
945        };
946
947        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
948            let page_writer = self.create_page_writer(desc, out.len())?;
949            let chunk = page_writer.buffer.clone();
950            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
951            Ok(ArrowColumnWriter {
952                chunk,
953                writer: ArrowColumnWriterImpl::ByteArray(writer),
954            })
955        };
956
957        match data_type {
958            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
959            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
960            ArrowDataType::LargeBinary
961            | ArrowDataType::Binary
962            | ArrowDataType::Utf8
963            | ArrowDataType::LargeUtf8
964            | ArrowDataType::BinaryView
965            | ArrowDataType::Utf8View => {
966                out.push(bytes(leaves.next().unwrap())?)
967            }
968            ArrowDataType::List(f)
969            | ArrowDataType::LargeList(f)
970            | ArrowDataType::FixedSizeList(f, _) => {
971                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
972            }
973            ArrowDataType::Struct(fields) => {
974                for field in fields {
975                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
976                }
977            }
978            ArrowDataType::Map(f, _) => match f.data_type() {
979                ArrowDataType::Struct(f) => {
980                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
981                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
982                }
983                _ => unreachable!("invalid map type"),
984            }
985            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
986                ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
987                    out.push(bytes(leaves.next().unwrap())?)
988                }
989                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
990                    out.push(bytes(leaves.next().unwrap())?)
991                }
992                ArrowDataType::FixedSizeBinary(_) => {
993                    out.push(bytes(leaves.next().unwrap())?)
994                }
995                _ => {
996                    out.push(col(leaves.next().unwrap())?)
997                }
998            }
999            _ => return Err(ParquetError::NYI(
1000                format!(
1001                    "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1002                )
1003            ))
1004        }
1005        Ok(())
1006    }
1007}
1008
1009fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1010    let column = levels.array().as_ref();
1011    let indices = levels.non_null_indices();
1012    match writer {
1013        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1014            match column.data_type() {
1015                ArrowDataType::Date64 => {
1016                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1017                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1018                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1019
1020                    let array = array.as_primitive::<Int32Type>();
1021                    write_primitive(typed, array.values(), levels)
1022                }
1023                ArrowDataType::UInt32 => {
1024                    let values = column.as_primitive::<UInt32Type>().values();
1025                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1026                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1027                    let array = values.inner().typed_data::<i32>();
1028                    write_primitive(typed, array, levels)
1029                }
1030                ArrowDataType::Decimal128(_, _) => {
1031                    // use the int32 to represent the decimal with low precision
1032                    let array = column
1033                        .as_primitive::<Decimal128Type>()
1034                        .unary::<_, Int32Type>(|v| v as i32);
1035                    write_primitive(typed, array.values(), levels)
1036                }
1037                ArrowDataType::Decimal256(_, _) => {
1038                    // use the int32 to represent the decimal with low precision
1039                    let array = column
1040                        .as_primitive::<Decimal256Type>()
1041                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1042                    write_primitive(typed, array.values(), levels)
1043                }
1044                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1045                    ArrowDataType::Decimal128(_, _) => {
1046                        let array = arrow_cast::cast(column, value_type)?;
1047                        let array = array
1048                            .as_primitive::<Decimal128Type>()
1049                            .unary::<_, Int32Type>(|v| v as i32);
1050                        write_primitive(typed, array.values(), levels)
1051                    }
1052                    ArrowDataType::Decimal256(_, _) => {
1053                        let array = arrow_cast::cast(column, value_type)?;
1054                        let array = array
1055                            .as_primitive::<Decimal256Type>()
1056                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1057                        write_primitive(typed, array.values(), levels)
1058                    }
1059                    _ => {
1060                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1061                        let array = array.as_primitive::<Int32Type>();
1062                        write_primitive(typed, array.values(), levels)
1063                    }
1064                },
1065                _ => {
1066                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1067                    let array = array.as_primitive::<Int32Type>();
1068                    write_primitive(typed, array.values(), levels)
1069                }
1070            }
1071        }
1072        ColumnWriter::BoolColumnWriter(ref mut typed) => {
1073            let array = column.as_boolean();
1074            typed.write_batch(
1075                get_bool_array_slice(array, indices).as_slice(),
1076                levels.def_levels(),
1077                levels.rep_levels(),
1078            )
1079        }
1080        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1081            match column.data_type() {
1082                ArrowDataType::Date64 => {
1083                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1084
1085                    let array = array.as_primitive::<Int64Type>();
1086                    write_primitive(typed, array.values(), levels)
1087                }
1088                ArrowDataType::Int64 => {
1089                    let array = column.as_primitive::<Int64Type>();
1090                    write_primitive(typed, array.values(), levels)
1091                }
1092                ArrowDataType::UInt64 => {
1093                    let values = column.as_primitive::<UInt64Type>().values();
1094                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1095                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1096                    let array = values.inner().typed_data::<i64>();
1097                    write_primitive(typed, array, levels)
1098                }
1099                ArrowDataType::Decimal128(_, _) => {
1100                    // use the int64 to represent the decimal with low precision
1101                    let array = column
1102                        .as_primitive::<Decimal128Type>()
1103                        .unary::<_, Int64Type>(|v| v as i64);
1104                    write_primitive(typed, array.values(), levels)
1105                }
1106                ArrowDataType::Decimal256(_, _) => {
1107                    // use the int64 to represent the decimal with low precision
1108                    let array = column
1109                        .as_primitive::<Decimal256Type>()
1110                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1111                    write_primitive(typed, array.values(), levels)
1112                }
1113                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1114                    ArrowDataType::Decimal128(_, _) => {
1115                        let array = arrow_cast::cast(column, value_type)?;
1116                        let array = array
1117                            .as_primitive::<Decimal128Type>()
1118                            .unary::<_, Int64Type>(|v| v as i64);
1119                        write_primitive(typed, array.values(), levels)
1120                    }
1121                    ArrowDataType::Decimal256(_, _) => {
1122                        let array = arrow_cast::cast(column, value_type)?;
1123                        let array = array
1124                            .as_primitive::<Decimal256Type>()
1125                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1126                        write_primitive(typed, array.values(), levels)
1127                    }
1128                    _ => {
1129                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1130                        let array = array.as_primitive::<Int64Type>();
1131                        write_primitive(typed, array.values(), levels)
1132                    }
1133                },
1134                _ => {
1135                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1136                    let array = array.as_primitive::<Int64Type>();
1137                    write_primitive(typed, array.values(), levels)
1138                }
1139            }
1140        }
1141        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1142            unreachable!("Currently unreachable because data type not supported")
1143        }
1144        ColumnWriter::FloatColumnWriter(ref mut typed) => {
1145            let array = column.as_primitive::<Float32Type>();
1146            write_primitive(typed, array.values(), levels)
1147        }
1148        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1149            let array = column.as_primitive::<Float64Type>();
1150            write_primitive(typed, array.values(), levels)
1151        }
1152        ColumnWriter::ByteArrayColumnWriter(_) => {
1153            unreachable!("should use ByteArrayWriter")
1154        }
1155        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1156            let bytes = match column.data_type() {
1157                ArrowDataType::Interval(interval_unit) => match interval_unit {
1158                    IntervalUnit::YearMonth => {
1159                        let array = column
1160                            .as_any()
1161                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1162                            .unwrap();
1163                        get_interval_ym_array_slice(array, indices)
1164                    }
1165                    IntervalUnit::DayTime => {
1166                        let array = column
1167                            .as_any()
1168                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1169                            .unwrap();
1170                        get_interval_dt_array_slice(array, indices)
1171                    }
1172                    _ => {
1173                        return Err(ParquetError::NYI(
1174                            format!(
1175                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1176                            )
1177                        ));
1178                    }
1179                },
1180                ArrowDataType::FixedSizeBinary(_) => {
1181                    let array = column
1182                        .as_any()
1183                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1184                        .unwrap();
1185                    get_fsb_array_slice(array, indices)
1186                }
1187                ArrowDataType::Decimal128(_, _) => {
1188                    let array = column.as_primitive::<Decimal128Type>();
1189                    get_decimal_128_array_slice(array, indices)
1190                }
1191                ArrowDataType::Decimal256(_, _) => {
1192                    let array = column
1193                        .as_any()
1194                        .downcast_ref::<arrow_array::Decimal256Array>()
1195                        .unwrap();
1196                    get_decimal_256_array_slice(array, indices)
1197                }
1198                ArrowDataType::Float16 => {
1199                    let array = column.as_primitive::<Float16Type>();
1200                    get_float_16_array_slice(array, indices)
1201                }
1202                _ => {
1203                    return Err(ParquetError::NYI(
1204                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1205                    ));
1206                }
1207            };
1208            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1209        }
1210    }
1211}
1212
1213fn write_primitive<E: ColumnValueEncoder>(
1214    writer: &mut GenericColumnWriter<E>,
1215    values: &E::Values,
1216    levels: &ArrayLevels,
1217) -> Result<usize> {
1218    writer.write_batch_internal(
1219        values,
1220        Some(levels.non_null_indices()),
1221        levels.def_levels(),
1222        levels.rep_levels(),
1223        None,
1224        None,
1225        None,
1226    )
1227}
1228
1229fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1230    let mut values = Vec::with_capacity(indices.len());
1231    for i in indices {
1232        values.push(array.value(*i))
1233    }
1234    values
1235}
1236
1237/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1238/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1239fn get_interval_ym_array_slice(
1240    array: &arrow_array::IntervalYearMonthArray,
1241    indices: &[usize],
1242) -> Vec<FixedLenByteArray> {
1243    let mut values = Vec::with_capacity(indices.len());
1244    for i in indices {
1245        let mut value = array.value(*i).to_le_bytes().to_vec();
1246        let mut suffix = vec![0; 8];
1247        value.append(&mut suffix);
1248        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1249    }
1250    values
1251}
1252
1253/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1254/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1255fn get_interval_dt_array_slice(
1256    array: &arrow_array::IntervalDayTimeArray,
1257    indices: &[usize],
1258) -> Vec<FixedLenByteArray> {
1259    let mut values = Vec::with_capacity(indices.len());
1260    for i in indices {
1261        let mut out = [0; 12];
1262        let value = array.value(*i);
1263        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1264        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1265        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1266    }
1267    values
1268}
1269
1270fn get_decimal_128_array_slice(
1271    array: &arrow_array::Decimal128Array,
1272    indices: &[usize],
1273) -> Vec<FixedLenByteArray> {
1274    let mut values = Vec::with_capacity(indices.len());
1275    let size = decimal_length_from_precision(array.precision());
1276    for i in indices {
1277        let as_be_bytes = array.value(*i).to_be_bytes();
1278        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1279        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1280    }
1281    values
1282}
1283
1284fn get_decimal_256_array_slice(
1285    array: &arrow_array::Decimal256Array,
1286    indices: &[usize],
1287) -> Vec<FixedLenByteArray> {
1288    let mut values = Vec::with_capacity(indices.len());
1289    let size = decimal_length_from_precision(array.precision());
1290    for i in indices {
1291        let as_be_bytes = array.value(*i).to_be_bytes();
1292        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1293        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1294    }
1295    values
1296}
1297
1298fn get_float_16_array_slice(
1299    array: &arrow_array::Float16Array,
1300    indices: &[usize],
1301) -> Vec<FixedLenByteArray> {
1302    let mut values = Vec::with_capacity(indices.len());
1303    for i in indices {
1304        let value = array.value(*i).to_le_bytes().to_vec();
1305        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1306    }
1307    values
1308}
1309
1310fn get_fsb_array_slice(
1311    array: &arrow_array::FixedSizeBinaryArray,
1312    indices: &[usize],
1313) -> Vec<FixedLenByteArray> {
1314    let mut values = Vec::with_capacity(indices.len());
1315    for i in indices {
1316        let value = array.value(*i).to_vec();
1317        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1318    }
1319    values
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324    use super::*;
1325
1326    use std::fs::File;
1327    use std::io::Seek;
1328
1329    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1330    use crate::arrow::ARROW_SCHEMA_META_KEY;
1331    use crate::format::PageHeader;
1332    use crate::thrift::TCompactSliceInputProtocol;
1333    use arrow::datatypes::ToByteSlice;
1334    use arrow::datatypes::{DataType, Schema};
1335    use arrow::error::Result as ArrowResult;
1336    use arrow::util::data_gen::create_random_array;
1337    use arrow::util::pretty::pretty_format_batches;
1338    use arrow::{array::*, buffer::Buffer};
1339    use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1340    use arrow_schema::Fields;
1341    use half::f16;
1342    use num::{FromPrimitive, ToPrimitive};
1343
1344    use crate::basic::Encoding;
1345    use crate::data_type::AsBytes;
1346    use crate::file::metadata::ParquetMetaData;
1347    use crate::file::page_index::index::Index;
1348    use crate::file::page_index::index_reader::read_offset_indexes;
1349    use crate::file::properties::{
1350        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1351    };
1352    use crate::file::serialized_reader::ReadOptionsBuilder;
1353    use crate::file::{
1354        reader::{FileReader, SerializedFileReader},
1355        statistics::Statistics,
1356    };
1357
1358    #[test]
1359    fn arrow_writer() {
1360        // define schema
1361        let schema = Schema::new(vec![
1362            Field::new("a", DataType::Int32, false),
1363            Field::new("b", DataType::Int32, true),
1364        ]);
1365
1366        // create some data
1367        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1368        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1369
1370        // build a record batch
1371        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1372
1373        roundtrip(batch, Some(SMALL_SIZE / 2));
1374    }
1375
1376    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1377        let mut buffer = vec![];
1378
1379        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1380        writer.write(expected_batch).unwrap();
1381        writer.close().unwrap();
1382
1383        buffer
1384    }
1385
1386    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1387        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1388        writer.write(expected_batch).unwrap();
1389        writer.into_inner().unwrap()
1390    }
1391
1392    #[test]
1393    fn roundtrip_bytes() {
1394        // define schema
1395        let schema = Arc::new(Schema::new(vec![
1396            Field::new("a", DataType::Int32, false),
1397            Field::new("b", DataType::Int32, true),
1398        ]));
1399
1400        // create some data
1401        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1402        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1403
1404        // build a record batch
1405        let expected_batch =
1406            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1407
1408        for buffer in [
1409            get_bytes_after_close(schema.clone(), &expected_batch),
1410            get_bytes_by_into_inner(schema, &expected_batch),
1411        ] {
1412            let cursor = Bytes::from(buffer);
1413            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1414
1415            let actual_batch = record_batch_reader
1416                .next()
1417                .expect("No batch found")
1418                .expect("Unable to get batch");
1419
1420            assert_eq!(expected_batch.schema(), actual_batch.schema());
1421            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1422            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1423            for i in 0..expected_batch.num_columns() {
1424                let expected_data = expected_batch.column(i).to_data();
1425                let actual_data = actual_batch.column(i).to_data();
1426
1427                assert_eq!(expected_data, actual_data);
1428            }
1429        }
1430    }
1431
1432    #[test]
1433    fn arrow_writer_non_null() {
1434        // define schema
1435        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1436
1437        // create some data
1438        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1439
1440        // build a record batch
1441        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1442
1443        roundtrip(batch, Some(SMALL_SIZE / 2));
1444    }
1445
1446    #[test]
1447    fn arrow_writer_list() {
1448        // define schema
1449        let schema = Schema::new(vec![Field::new(
1450            "a",
1451            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1452            true,
1453        )]);
1454
1455        // create some data
1456        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1457
1458        // Construct a buffer for value offsets, for the nested array:
1459        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1460        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1461
1462        // Construct a list array from the above two
1463        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1464            DataType::Int32,
1465            false,
1466        ))))
1467        .len(5)
1468        .add_buffer(a_value_offsets)
1469        .add_child_data(a_values.into_data())
1470        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1471        .build()
1472        .unwrap();
1473        let a = ListArray::from(a_list_data);
1474
1475        // build a record batch
1476        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1477
1478        assert_eq!(batch.column(0).null_count(), 1);
1479
1480        // This test fails if the max row group size is less than the batch's length
1481        // see https://github.com/apache/arrow-rs/issues/518
1482        roundtrip(batch, None);
1483    }
1484
1485    #[test]
1486    fn arrow_writer_list_non_null() {
1487        // define schema
1488        let schema = Schema::new(vec![Field::new(
1489            "a",
1490            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1491            false,
1492        )]);
1493
1494        // create some data
1495        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1496
1497        // Construct a buffer for value offsets, for the nested array:
1498        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1499        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1500
1501        // Construct a list array from the above two
1502        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1503            DataType::Int32,
1504            false,
1505        ))))
1506        .len(5)
1507        .add_buffer(a_value_offsets)
1508        .add_child_data(a_values.into_data())
1509        .build()
1510        .unwrap();
1511        let a = ListArray::from(a_list_data);
1512
1513        // build a record batch
1514        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1515
1516        // This test fails if the max row group size is less than the batch's length
1517        // see https://github.com/apache/arrow-rs/issues/518
1518        assert_eq!(batch.column(0).null_count(), 0);
1519
1520        roundtrip(batch, None);
1521    }
1522
1523    #[test]
1524    fn arrow_writer_binary() {
1525        let string_field = Field::new("a", DataType::Utf8, false);
1526        let binary_field = Field::new("b", DataType::Binary, false);
1527        let schema = Schema::new(vec![string_field, binary_field]);
1528
1529        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1530        let raw_binary_values = [
1531            b"foo".to_vec(),
1532            b"bar".to_vec(),
1533            b"baz".to_vec(),
1534            b"quux".to_vec(),
1535        ];
1536        let raw_binary_value_refs = raw_binary_values
1537            .iter()
1538            .map(|x| x.as_slice())
1539            .collect::<Vec<_>>();
1540
1541        let string_values = StringArray::from(raw_string_values.clone());
1542        let binary_values = BinaryArray::from(raw_binary_value_refs);
1543        let batch = RecordBatch::try_new(
1544            Arc::new(schema),
1545            vec![Arc::new(string_values), Arc::new(binary_values)],
1546        )
1547        .unwrap();
1548
1549        roundtrip(batch, Some(SMALL_SIZE / 2));
1550    }
1551
1552    #[test]
1553    fn arrow_writer_binary_view() {
1554        let string_field = Field::new("a", DataType::Utf8View, false);
1555        let binary_field = Field::new("b", DataType::BinaryView, false);
1556        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1557        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1558
1559        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1560        let raw_binary_values = vec![
1561            b"foo".to_vec(),
1562            b"bar".to_vec(),
1563            b"large payload over 12 bytes".to_vec(),
1564            b"lulu".to_vec(),
1565        ];
1566        let nullable_string_values =
1567            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1568
1569        let string_view_values = StringViewArray::from(raw_string_values);
1570        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1571        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1572        let batch = RecordBatch::try_new(
1573            Arc::new(schema),
1574            vec![
1575                Arc::new(string_view_values),
1576                Arc::new(binary_view_values),
1577                Arc::new(nullable_string_view_values),
1578            ],
1579        )
1580        .unwrap();
1581
1582        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1583        roundtrip(batch, None);
1584    }
1585
1586    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1587        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1588        let schema = Schema::new(vec![decimal_field]);
1589
1590        let decimal_values = vec![10_000, 50_000, 0, -100]
1591            .into_iter()
1592            .map(Some)
1593            .collect::<Decimal128Array>()
1594            .with_precision_and_scale(precision, scale)
1595            .unwrap();
1596
1597        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1598    }
1599
1600    #[test]
1601    fn arrow_writer_decimal() {
1602        // int32 to store the decimal value
1603        let batch_int32_decimal = get_decimal_batch(5, 2);
1604        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1605        // int64 to store the decimal value
1606        let batch_int64_decimal = get_decimal_batch(12, 2);
1607        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1608        // fixed_length_byte_array to store the decimal value
1609        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1610        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1611    }
1612
1613    #[test]
1614    fn arrow_writer_complex() {
1615        // define schema
1616        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1617        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1618        let struct_field_g = Arc::new(Field::new_list(
1619            "g",
1620            Field::new_list_field(DataType::Int16, true),
1621            false,
1622        ));
1623        let struct_field_h = Arc::new(Field::new_list(
1624            "h",
1625            Field::new_list_field(DataType::Int16, false),
1626            true,
1627        ));
1628        let struct_field_e = Arc::new(Field::new_struct(
1629            "e",
1630            vec![
1631                struct_field_f.clone(),
1632                struct_field_g.clone(),
1633                struct_field_h.clone(),
1634            ],
1635            false,
1636        ));
1637        let schema = Schema::new(vec![
1638            Field::new("a", DataType::Int32, false),
1639            Field::new("b", DataType::Int32, true),
1640            Field::new_struct(
1641                "c",
1642                vec![struct_field_d.clone(), struct_field_e.clone()],
1643                false,
1644            ),
1645        ]);
1646
1647        // create some data
1648        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1649        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1650        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1651        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1652
1653        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1654
1655        // Construct a buffer for value offsets, for the nested array:
1656        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1657        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1658
1659        // Construct a list array from the above two
1660        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1661            .len(5)
1662            .add_buffer(g_value_offsets.clone())
1663            .add_child_data(g_value.to_data())
1664            .build()
1665            .unwrap();
1666        let g = ListArray::from(g_list_data);
1667        // The difference between g and h is that h has a null bitmap
1668        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1669            .len(5)
1670            .add_buffer(g_value_offsets)
1671            .add_child_data(g_value.to_data())
1672            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1673            .build()
1674            .unwrap();
1675        let h = ListArray::from(h_list_data);
1676
1677        let e = StructArray::from(vec![
1678            (struct_field_f, Arc::new(f) as ArrayRef),
1679            (struct_field_g, Arc::new(g) as ArrayRef),
1680            (struct_field_h, Arc::new(h) as ArrayRef),
1681        ]);
1682
1683        let c = StructArray::from(vec![
1684            (struct_field_d, Arc::new(d) as ArrayRef),
1685            (struct_field_e, Arc::new(e) as ArrayRef),
1686        ]);
1687
1688        // build a record batch
1689        let batch = RecordBatch::try_new(
1690            Arc::new(schema),
1691            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1692        )
1693        .unwrap();
1694
1695        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1696        roundtrip(batch, Some(SMALL_SIZE / 3));
1697    }
1698
1699    #[test]
1700    fn arrow_writer_complex_mixed() {
1701        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1702        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1703
1704        // define schema
1705        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1706        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1707        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1708        let schema = Schema::new(vec![Field::new(
1709            "some_nested_object",
1710            DataType::Struct(Fields::from(vec![
1711                offset_field.clone(),
1712                partition_field.clone(),
1713                topic_field.clone(),
1714            ])),
1715            false,
1716        )]);
1717
1718        // create some data
1719        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1720        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1721        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1722
1723        let some_nested_object = StructArray::from(vec![
1724            (offset_field, Arc::new(offset) as ArrayRef),
1725            (partition_field, Arc::new(partition) as ArrayRef),
1726            (topic_field, Arc::new(topic) as ArrayRef),
1727        ]);
1728
1729        // build a record batch
1730        let batch =
1731            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1732
1733        roundtrip(batch, Some(SMALL_SIZE / 2));
1734    }
1735
1736    #[test]
1737    fn arrow_writer_map() {
1738        // Note: we are using the JSON Arrow reader for brevity
1739        let json_content = r#"
1740        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1741        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1742        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1743        "#;
1744        let entries_struct_type = DataType::Struct(Fields::from(vec![
1745            Field::new("key", DataType::Utf8, false),
1746            Field::new("value", DataType::Utf8, true),
1747        ]));
1748        let stocks_field = Field::new(
1749            "stocks",
1750            DataType::Map(
1751                Arc::new(Field::new("entries", entries_struct_type, false)),
1752                false,
1753            ),
1754            true,
1755        );
1756        let schema = Arc::new(Schema::new(vec![stocks_field]));
1757        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1758        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1759
1760        let batch = reader.next().unwrap().unwrap();
1761        roundtrip(batch, None);
1762    }
1763
1764    #[test]
1765    fn arrow_writer_2_level_struct() {
1766        // tests writing <struct<struct<primitive>>
1767        let field_c = Field::new("c", DataType::Int32, true);
1768        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1769        let type_a = DataType::Struct(vec![field_b.clone()].into());
1770        let field_a = Field::new("a", type_a, true);
1771        let schema = Schema::new(vec![field_a.clone()]);
1772
1773        // create data
1774        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1775        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1776            .len(6)
1777            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1778            .add_child_data(c.into_data())
1779            .build()
1780            .unwrap();
1781        let b = StructArray::from(b_data);
1782        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1783            .len(6)
1784            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1785            .add_child_data(b.into_data())
1786            .build()
1787            .unwrap();
1788        let a = StructArray::from(a_data);
1789
1790        assert_eq!(a.null_count(), 1);
1791        assert_eq!(a.column(0).null_count(), 2);
1792
1793        // build a racord batch
1794        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1795
1796        roundtrip(batch, Some(SMALL_SIZE / 2));
1797    }
1798
1799    #[test]
1800    fn arrow_writer_2_level_struct_non_null() {
1801        // tests writing <struct<struct<primitive>>
1802        let field_c = Field::new("c", DataType::Int32, false);
1803        let type_b = DataType::Struct(vec![field_c].into());
1804        let field_b = Field::new("b", type_b.clone(), false);
1805        let type_a = DataType::Struct(vec![field_b].into());
1806        let field_a = Field::new("a", type_a.clone(), false);
1807        let schema = Schema::new(vec![field_a]);
1808
1809        // create data
1810        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1811        let b_data = ArrayDataBuilder::new(type_b)
1812            .len(6)
1813            .add_child_data(c.into_data())
1814            .build()
1815            .unwrap();
1816        let b = StructArray::from(b_data);
1817        let a_data = ArrayDataBuilder::new(type_a)
1818            .len(6)
1819            .add_child_data(b.into_data())
1820            .build()
1821            .unwrap();
1822        let a = StructArray::from(a_data);
1823
1824        assert_eq!(a.null_count(), 0);
1825        assert_eq!(a.column(0).null_count(), 0);
1826
1827        // build a racord batch
1828        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1829
1830        roundtrip(batch, Some(SMALL_SIZE / 2));
1831    }
1832
1833    #[test]
1834    fn arrow_writer_2_level_struct_mixed_null() {
1835        // tests writing <struct<struct<primitive>>
1836        let field_c = Field::new("c", DataType::Int32, false);
1837        let type_b = DataType::Struct(vec![field_c].into());
1838        let field_b = Field::new("b", type_b.clone(), true);
1839        let type_a = DataType::Struct(vec![field_b].into());
1840        let field_a = Field::new("a", type_a.clone(), false);
1841        let schema = Schema::new(vec![field_a]);
1842
1843        // create data
1844        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1845        let b_data = ArrayDataBuilder::new(type_b)
1846            .len(6)
1847            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1848            .add_child_data(c.into_data())
1849            .build()
1850            .unwrap();
1851        let b = StructArray::from(b_data);
1852        // a intentionally has no null buffer, to test that this is handled correctly
1853        let a_data = ArrayDataBuilder::new(type_a)
1854            .len(6)
1855            .add_child_data(b.into_data())
1856            .build()
1857            .unwrap();
1858        let a = StructArray::from(a_data);
1859
1860        assert_eq!(a.null_count(), 0);
1861        assert_eq!(a.column(0).null_count(), 2);
1862
1863        // build a racord batch
1864        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1865
1866        roundtrip(batch, Some(SMALL_SIZE / 2));
1867    }
1868
1869    #[test]
1870    fn arrow_writer_2_level_struct_mixed_null_2() {
1871        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
1872        let field_c = Field::new("c", DataType::Int32, false);
1873        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1874        let field_e = Field::new(
1875            "e",
1876            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1877            false,
1878        );
1879
1880        let field_b = Field::new(
1881            "b",
1882            DataType::Struct(vec![field_c, field_d, field_e].into()),
1883            false,
1884        );
1885        let type_a = DataType::Struct(vec![field_b.clone()].into());
1886        let field_a = Field::new("a", type_a, true);
1887        let schema = Schema::new(vec![field_a.clone()]);
1888
1889        // create data
1890        let c = Int32Array::from_iter_values(0..6);
1891        let d = FixedSizeBinaryArray::try_from_iter(
1892            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1893        )
1894        .expect("four byte values");
1895        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1896        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1897            .len(6)
1898            .add_child_data(c.into_data())
1899            .add_child_data(d.into_data())
1900            .add_child_data(e.into_data())
1901            .build()
1902            .unwrap();
1903        let b = StructArray::from(b_data);
1904        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1905            .len(6)
1906            .null_bit_buffer(Some(Buffer::from([0b00100101])))
1907            .add_child_data(b.into_data())
1908            .build()
1909            .unwrap();
1910        let a = StructArray::from(a_data);
1911
1912        assert_eq!(a.null_count(), 3);
1913        assert_eq!(a.column(0).null_count(), 0);
1914
1915        // build a record batch
1916        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1917
1918        roundtrip(batch, Some(SMALL_SIZE / 2));
1919    }
1920
1921    #[test]
1922    fn test_fixed_size_binary_in_dict() {
1923        fn test_fixed_size_binary_in_dict_inner<K>()
1924        where
1925            K: ArrowDictionaryKeyType,
1926            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
1927            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
1928        {
1929            let field = Field::new(
1930                "a",
1931                DataType::Dictionary(
1932                    Box::new(K::DATA_TYPE),
1933                    Box::new(DataType::FixedSizeBinary(4)),
1934                ),
1935                false,
1936            );
1937            let schema = Schema::new(vec![field]);
1938
1939            let keys: Vec<K::Native> = vec![
1940                K::Native::try_from(0u8).unwrap(),
1941                K::Native::try_from(0u8).unwrap(),
1942                K::Native::try_from(1u8).unwrap(),
1943            ];
1944            let keys = PrimitiveArray::<K>::from_iter_values(keys);
1945            let values = FixedSizeBinaryArray::try_from_iter(
1946                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
1947            )
1948            .unwrap();
1949
1950            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
1951            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
1952            roundtrip(batch, None);
1953        }
1954
1955        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
1956        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1957        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
1958        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1959        test_fixed_size_binary_in_dict_inner::<Int8Type>();
1960        test_fixed_size_binary_in_dict_inner::<Int16Type>();
1961        test_fixed_size_binary_in_dict_inner::<Int32Type>();
1962        test_fixed_size_binary_in_dict_inner::<Int64Type>();
1963    }
1964
1965    #[test]
1966    fn test_empty_dict() {
1967        let struct_fields = Fields::from(vec![Field::new(
1968            "dict",
1969            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1970            false,
1971        )]);
1972
1973        let schema = Schema::new(vec![Field::new_struct(
1974            "struct",
1975            struct_fields.clone(),
1976            true,
1977        )]);
1978        let dictionary = Arc::new(DictionaryArray::new(
1979            Int32Array::new_null(5),
1980            Arc::new(StringArray::new_null(0)),
1981        ));
1982
1983        let s = StructArray::new(
1984            struct_fields,
1985            vec![dictionary],
1986            Some(NullBuffer::new_null(5)),
1987        );
1988
1989        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1990        roundtrip(batch, None);
1991    }
1992    #[test]
1993    fn arrow_writer_page_size() {
1994        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1995
1996        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1997
1998        // Generate an array of 10 unique 10 character string
1999        for i in 0..10 {
2000            let value = i
2001                .to_string()
2002                .repeat(10)
2003                .chars()
2004                .take(10)
2005                .collect::<String>();
2006
2007            builder.append_value(value);
2008        }
2009
2010        let array = Arc::new(builder.finish());
2011
2012        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2013
2014        let file = tempfile::tempfile().unwrap();
2015
2016        // Set everything very low so we fallback to PLAIN encoding after the first row
2017        let props = WriterProperties::builder()
2018            .set_data_page_size_limit(1)
2019            .set_dictionary_page_size_limit(1)
2020            .set_write_batch_size(1)
2021            .build();
2022
2023        let mut writer =
2024            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2025                .expect("Unable to write file");
2026        writer.write(&batch).unwrap();
2027        writer.close().unwrap();
2028
2029        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2030
2031        let column = reader.metadata().row_group(0).columns();
2032
2033        assert_eq!(column.len(), 1);
2034
2035        // We should write one row before falling back to PLAIN encoding so there should still be a
2036        // dictionary page.
2037        assert!(
2038            column[0].dictionary_page_offset().is_some(),
2039            "Expected a dictionary page"
2040        );
2041
2042        let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
2043
2044        let page_locations = offset_indexes[0].page_locations.clone();
2045
2046        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2047        // so we expect one dictionary encoded page and then a page per row thereafter.
2048        assert_eq!(
2049            page_locations.len(),
2050            10,
2051            "Expected 9 pages but got {page_locations:#?}"
2052        );
2053    }
2054
2055    #[test]
2056    fn arrow_writer_float_nans() {
2057        let f16_field = Field::new("a", DataType::Float16, false);
2058        let f32_field = Field::new("b", DataType::Float32, false);
2059        let f64_field = Field::new("c", DataType::Float64, false);
2060        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2061
2062        let f16_values = (0..MEDIUM_SIZE)
2063            .map(|i| {
2064                Some(if i % 2 == 0 {
2065                    f16::NAN
2066                } else {
2067                    f16::from_f32(i as f32)
2068                })
2069            })
2070            .collect::<Float16Array>();
2071
2072        let f32_values = (0..MEDIUM_SIZE)
2073            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2074            .collect::<Float32Array>();
2075
2076        let f64_values = (0..MEDIUM_SIZE)
2077            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2078            .collect::<Float64Array>();
2079
2080        let batch = RecordBatch::try_new(
2081            Arc::new(schema),
2082            vec![
2083                Arc::new(f16_values),
2084                Arc::new(f32_values),
2085                Arc::new(f64_values),
2086            ],
2087        )
2088        .unwrap();
2089
2090        roundtrip(batch, None);
2091    }
2092
2093    const SMALL_SIZE: usize = 7;
2094    const MEDIUM_SIZE: usize = 63;
2095
2096    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2097        let mut files = vec![];
2098        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2099            let mut props = WriterProperties::builder().set_writer_version(version);
2100
2101            if let Some(size) = max_row_group_size {
2102                props = props.set_max_row_group_size(size)
2103            }
2104
2105            let props = props.build();
2106            files.push(roundtrip_opts(&expected_batch, props))
2107        }
2108        files
2109    }
2110
2111    fn roundtrip_opts_with_array_validation<F>(
2112        expected_batch: &RecordBatch,
2113        props: WriterProperties,
2114        validate: F,
2115    ) -> File
2116    where
2117        F: Fn(&ArrayData, &ArrayData),
2118    {
2119        let file = tempfile::tempfile().unwrap();
2120
2121        let mut writer = ArrowWriter::try_new(
2122            file.try_clone().unwrap(),
2123            expected_batch.schema(),
2124            Some(props),
2125        )
2126        .expect("Unable to write file");
2127        writer.write(expected_batch).unwrap();
2128        writer.close().unwrap();
2129
2130        let mut record_batch_reader =
2131            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2132
2133        let actual_batch = record_batch_reader
2134            .next()
2135            .expect("No batch found")
2136            .expect("Unable to get batch");
2137
2138        assert_eq!(expected_batch.schema(), actual_batch.schema());
2139        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2140        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2141        for i in 0..expected_batch.num_columns() {
2142            let expected_data = expected_batch.column(i).to_data();
2143            let actual_data = actual_batch.column(i).to_data();
2144            validate(&expected_data, &actual_data);
2145        }
2146
2147        file
2148    }
2149
2150    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2151        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2152            a.validate_full().expect("valid expected data");
2153            b.validate_full().expect("valid actual data");
2154            assert_eq!(a, b)
2155        })
2156    }
2157
2158    struct RoundTripOptions {
2159        values: ArrayRef,
2160        schema: SchemaRef,
2161        bloom_filter: bool,
2162        bloom_filter_position: BloomFilterPosition,
2163    }
2164
2165    impl RoundTripOptions {
2166        fn new(values: ArrayRef, nullable: bool) -> Self {
2167            let data_type = values.data_type().clone();
2168            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2169            Self {
2170                values,
2171                schema: Arc::new(schema),
2172                bloom_filter: false,
2173                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2174            }
2175        }
2176    }
2177
2178    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2179        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2180    }
2181
2182    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2183        let mut options = RoundTripOptions::new(values, false);
2184        options.schema = schema;
2185        one_column_roundtrip_with_options(options)
2186    }
2187
2188    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2189        let RoundTripOptions {
2190            values,
2191            schema,
2192            bloom_filter,
2193            bloom_filter_position,
2194        } = options;
2195
2196        let encodings = match values.data_type() {
2197            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2198                vec![
2199                    Encoding::PLAIN,
2200                    Encoding::DELTA_BYTE_ARRAY,
2201                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2202                ]
2203            }
2204            DataType::Int64
2205            | DataType::Int32
2206            | DataType::Int16
2207            | DataType::Int8
2208            | DataType::UInt64
2209            | DataType::UInt32
2210            | DataType::UInt16
2211            | DataType::UInt8 => vec![
2212                Encoding::PLAIN,
2213                Encoding::DELTA_BINARY_PACKED,
2214                Encoding::BYTE_STREAM_SPLIT,
2215            ],
2216            DataType::Float32 | DataType::Float64 => {
2217                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2218            }
2219            _ => vec![Encoding::PLAIN],
2220        };
2221
2222        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2223
2224        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2225
2226        let mut files = vec![];
2227        for dictionary_size in [0, 1, 1024] {
2228            for encoding in &encodings {
2229                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2230                    for row_group_size in row_group_sizes {
2231                        let props = WriterProperties::builder()
2232                            .set_writer_version(version)
2233                            .set_max_row_group_size(row_group_size)
2234                            .set_dictionary_enabled(dictionary_size != 0)
2235                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2236                            .set_encoding(*encoding)
2237                            .set_bloom_filter_enabled(bloom_filter)
2238                            .set_bloom_filter_position(bloom_filter_position)
2239                            .build();
2240
2241                        files.push(roundtrip_opts(&expected_batch, props))
2242                    }
2243                }
2244            }
2245        }
2246        files
2247    }
2248
2249    fn values_required<A, I>(iter: I) -> Vec<File>
2250    where
2251        A: From<Vec<I::Item>> + Array + 'static,
2252        I: IntoIterator,
2253    {
2254        let raw_values: Vec<_> = iter.into_iter().collect();
2255        let values = Arc::new(A::from(raw_values));
2256        one_column_roundtrip(values, false)
2257    }
2258
2259    fn values_optional<A, I>(iter: I) -> Vec<File>
2260    where
2261        A: From<Vec<Option<I::Item>>> + Array + 'static,
2262        I: IntoIterator,
2263    {
2264        let optional_raw_values: Vec<_> = iter
2265            .into_iter()
2266            .enumerate()
2267            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2268            .collect();
2269        let optional_values = Arc::new(A::from(optional_raw_values));
2270        one_column_roundtrip(optional_values, true)
2271    }
2272
2273    fn required_and_optional<A, I>(iter: I)
2274    where
2275        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2276        I: IntoIterator + Clone,
2277    {
2278        values_required::<A, I>(iter.clone());
2279        values_optional::<A, I>(iter);
2280    }
2281
2282    fn check_bloom_filter<T: AsBytes>(
2283        files: Vec<File>,
2284        file_column: String,
2285        positive_values: Vec<T>,
2286        negative_values: Vec<T>,
2287    ) {
2288        files.into_iter().take(1).for_each(|file| {
2289            let file_reader = SerializedFileReader::new_with_options(
2290                file,
2291                ReadOptionsBuilder::new()
2292                    .with_reader_properties(
2293                        ReaderProperties::builder()
2294                            .set_read_bloom_filter(true)
2295                            .build(),
2296                    )
2297                    .build(),
2298            )
2299            .expect("Unable to open file as Parquet");
2300            let metadata = file_reader.metadata();
2301
2302            // Gets bloom filters from all row groups.
2303            let mut bloom_filters: Vec<_> = vec![];
2304            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2305                if let Some((column_index, _)) = row_group
2306                    .columns()
2307                    .iter()
2308                    .enumerate()
2309                    .find(|(_, column)| column.column_path().string() == file_column)
2310                {
2311                    let row_group_reader = file_reader
2312                        .get_row_group(ri)
2313                        .expect("Unable to read row group");
2314                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2315                        bloom_filters.push(sbbf.clone());
2316                    } else {
2317                        panic!("No bloom filter for column named {file_column} found");
2318                    }
2319                } else {
2320                    panic!("No column named {file_column} found");
2321                }
2322            }
2323
2324            positive_values.iter().for_each(|value| {
2325                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2326                assert!(
2327                    found.is_some(),
2328                    "{}",
2329                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2330                );
2331            });
2332
2333            negative_values.iter().for_each(|value| {
2334                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2335                assert!(
2336                    found.is_none(),
2337                    "{}",
2338                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2339                );
2340            });
2341        });
2342    }
2343
2344    #[test]
2345    fn all_null_primitive_single_column() {
2346        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2347        one_column_roundtrip(values, true);
2348    }
2349    #[test]
2350    fn null_single_column() {
2351        let values = Arc::new(NullArray::new(SMALL_SIZE));
2352        one_column_roundtrip(values, true);
2353        // null arrays are always nullable, a test with non-nullable nulls fails
2354    }
2355
2356    #[test]
2357    fn bool_single_column() {
2358        required_and_optional::<BooleanArray, _>(
2359            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2360        );
2361    }
2362
2363    #[test]
2364    fn bool_large_single_column() {
2365        let values = Arc::new(
2366            [None, Some(true), Some(false)]
2367                .iter()
2368                .cycle()
2369                .copied()
2370                .take(200_000)
2371                .collect::<BooleanArray>(),
2372        );
2373        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2374        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2375        let file = tempfile::tempfile().unwrap();
2376
2377        let mut writer =
2378            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2379                .expect("Unable to write file");
2380        writer.write(&expected_batch).unwrap();
2381        writer.close().unwrap();
2382    }
2383
2384    #[test]
2385    fn check_page_offset_index_with_nan() {
2386        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2387        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2388        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2389
2390        let mut out = Vec::with_capacity(1024);
2391        let mut writer =
2392            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2393        writer.write(&batch).unwrap();
2394        let file_meta_data = writer.close().unwrap();
2395        for row_group in file_meta_data.row_groups {
2396            for column in row_group.columns {
2397                assert!(column.offset_index_offset.is_some());
2398                assert!(column.offset_index_length.is_some());
2399                assert!(column.column_index_offset.is_none());
2400                assert!(column.column_index_length.is_none());
2401            }
2402        }
2403    }
2404
2405    #[test]
2406    fn i8_single_column() {
2407        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2408    }
2409
2410    #[test]
2411    fn i16_single_column() {
2412        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2413    }
2414
2415    #[test]
2416    fn i32_single_column() {
2417        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2418    }
2419
2420    #[test]
2421    fn i64_single_column() {
2422        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2423    }
2424
2425    #[test]
2426    fn u8_single_column() {
2427        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2428    }
2429
2430    #[test]
2431    fn u16_single_column() {
2432        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2433    }
2434
2435    #[test]
2436    fn u32_single_column() {
2437        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2438    }
2439
2440    #[test]
2441    fn u64_single_column() {
2442        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2443    }
2444
2445    #[test]
2446    fn f32_single_column() {
2447        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2448    }
2449
2450    #[test]
2451    fn f64_single_column() {
2452        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2453    }
2454
2455    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2456    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2457    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2458
2459    #[test]
2460    fn timestamp_second_single_column() {
2461        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2462        let values = Arc::new(TimestampSecondArray::from(raw_values));
2463
2464        one_column_roundtrip(values, false);
2465    }
2466
2467    #[test]
2468    fn timestamp_millisecond_single_column() {
2469        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2470        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2471
2472        one_column_roundtrip(values, false);
2473    }
2474
2475    #[test]
2476    fn timestamp_microsecond_single_column() {
2477        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2478        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2479
2480        one_column_roundtrip(values, false);
2481    }
2482
2483    #[test]
2484    fn timestamp_nanosecond_single_column() {
2485        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2486        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2487
2488        one_column_roundtrip(values, false);
2489    }
2490
2491    #[test]
2492    fn date32_single_column() {
2493        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2494    }
2495
2496    #[test]
2497    fn date64_single_column() {
2498        // Date64 must be a multiple of 86400000, see ARROW-10925
2499        required_and_optional::<Date64Array, _>(
2500            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2501        );
2502    }
2503
2504    #[test]
2505    fn time32_second_single_column() {
2506        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2507    }
2508
2509    #[test]
2510    fn time32_millisecond_single_column() {
2511        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2512    }
2513
2514    #[test]
2515    fn time64_microsecond_single_column() {
2516        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2517    }
2518
2519    #[test]
2520    fn time64_nanosecond_single_column() {
2521        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2522    }
2523
2524    #[test]
2525    fn duration_second_single_column() {
2526        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2527    }
2528
2529    #[test]
2530    fn duration_millisecond_single_column() {
2531        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2532    }
2533
2534    #[test]
2535    fn duration_microsecond_single_column() {
2536        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2537    }
2538
2539    #[test]
2540    fn duration_nanosecond_single_column() {
2541        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2542    }
2543
2544    #[test]
2545    fn interval_year_month_single_column() {
2546        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2547    }
2548
2549    #[test]
2550    fn interval_day_time_single_column() {
2551        required_and_optional::<IntervalDayTimeArray, _>(vec![
2552            IntervalDayTime::new(0, 1),
2553            IntervalDayTime::new(0, 3),
2554            IntervalDayTime::new(3, -2),
2555            IntervalDayTime::new(-200, 4),
2556        ]);
2557    }
2558
2559    #[test]
2560    #[should_panic(
2561        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2562    )]
2563    fn interval_month_day_nano_single_column() {
2564        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2565            IntervalMonthDayNano::new(0, 1, 5),
2566            IntervalMonthDayNano::new(0, 3, 2),
2567            IntervalMonthDayNano::new(3, -2, -5),
2568            IntervalMonthDayNano::new(-200, 4, -1),
2569        ]);
2570    }
2571
2572    #[test]
2573    fn binary_single_column() {
2574        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2575        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2576        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2577
2578        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2579        values_required::<BinaryArray, _>(many_vecs_iter);
2580    }
2581
2582    #[test]
2583    fn binary_view_single_column() {
2584        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2585        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2586        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2587
2588        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2589        values_required::<BinaryViewArray, _>(many_vecs_iter);
2590    }
2591
2592    #[test]
2593    fn i32_column_bloom_filter_at_end() {
2594        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2595        let mut options = RoundTripOptions::new(array, false);
2596        options.bloom_filter = true;
2597        options.bloom_filter_position = BloomFilterPosition::End;
2598
2599        let files = one_column_roundtrip_with_options(options);
2600        check_bloom_filter(
2601            files,
2602            "col".to_string(),
2603            (0..SMALL_SIZE as i32).collect(),
2604            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2605        );
2606    }
2607
2608    #[test]
2609    fn i32_column_bloom_filter() {
2610        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2611        let mut options = RoundTripOptions::new(array, false);
2612        options.bloom_filter = true;
2613
2614        let files = one_column_roundtrip_with_options(options);
2615        check_bloom_filter(
2616            files,
2617            "col".to_string(),
2618            (0..SMALL_SIZE as i32).collect(),
2619            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2620        );
2621    }
2622
2623    #[test]
2624    fn binary_column_bloom_filter() {
2625        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2626        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2627        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2628
2629        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2630        let mut options = RoundTripOptions::new(array, false);
2631        options.bloom_filter = true;
2632
2633        let files = one_column_roundtrip_with_options(options);
2634        check_bloom_filter(
2635            files,
2636            "col".to_string(),
2637            many_vecs,
2638            vec![vec![(SMALL_SIZE + 1) as u8]],
2639        );
2640    }
2641
2642    #[test]
2643    fn empty_string_null_column_bloom_filter() {
2644        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2645        let raw_strs = raw_values.iter().map(|s| s.as_str());
2646
2647        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2648        let mut options = RoundTripOptions::new(array, false);
2649        options.bloom_filter = true;
2650
2651        let files = one_column_roundtrip_with_options(options);
2652
2653        let optional_raw_values: Vec<_> = raw_values
2654            .iter()
2655            .enumerate()
2656            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2657            .collect();
2658        // For null slots, empty string should not be in bloom filter.
2659        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2660    }
2661
2662    #[test]
2663    fn large_binary_single_column() {
2664        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2665        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2666        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2667
2668        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2669        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2670    }
2671
2672    #[test]
2673    fn fixed_size_binary_single_column() {
2674        let mut builder = FixedSizeBinaryBuilder::new(4);
2675        builder.append_value(b"0123").unwrap();
2676        builder.append_null();
2677        builder.append_value(b"8910").unwrap();
2678        builder.append_value(b"1112").unwrap();
2679        let array = Arc::new(builder.finish());
2680
2681        one_column_roundtrip(array, true);
2682    }
2683
2684    #[test]
2685    fn string_single_column() {
2686        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2687        let raw_strs = raw_values.iter().map(|s| s.as_str());
2688
2689        required_and_optional::<StringArray, _>(raw_strs);
2690    }
2691
2692    #[test]
2693    fn large_string_single_column() {
2694        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2695        let raw_strs = raw_values.iter().map(|s| s.as_str());
2696
2697        required_and_optional::<LargeStringArray, _>(raw_strs);
2698    }
2699
2700    #[test]
2701    fn string_view_single_column() {
2702        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2703        let raw_strs = raw_values.iter().map(|s| s.as_str());
2704
2705        required_and_optional::<StringViewArray, _>(raw_strs);
2706    }
2707
2708    #[test]
2709    fn null_list_single_column() {
2710        let null_field = Field::new_list_field(DataType::Null, true);
2711        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2712
2713        let schema = Schema::new(vec![list_field]);
2714
2715        // Build [[], null, [null, null]]
2716        let a_values = NullArray::new(2);
2717        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2718        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2719            DataType::Null,
2720            true,
2721        ))))
2722        .len(3)
2723        .add_buffer(a_value_offsets)
2724        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2725        .add_child_data(a_values.into_data())
2726        .build()
2727        .unwrap();
2728
2729        let a = ListArray::from(a_list_data);
2730
2731        assert!(a.is_valid(0));
2732        assert!(!a.is_valid(1));
2733        assert!(a.is_valid(2));
2734
2735        assert_eq!(a.value(0).len(), 0);
2736        assert_eq!(a.value(2).len(), 2);
2737        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2738
2739        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2740        roundtrip(batch, None);
2741    }
2742
2743    #[test]
2744    fn list_single_column() {
2745        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2746        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2747        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2748            DataType::Int32,
2749            false,
2750        ))))
2751        .len(5)
2752        .add_buffer(a_value_offsets)
2753        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2754        .add_child_data(a_values.into_data())
2755        .build()
2756        .unwrap();
2757
2758        assert_eq!(a_list_data.null_count(), 1);
2759
2760        let a = ListArray::from(a_list_data);
2761        let values = Arc::new(a);
2762
2763        one_column_roundtrip(values, true);
2764    }
2765
2766    #[test]
2767    fn large_list_single_column() {
2768        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2769        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2770        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2771            "large_item",
2772            DataType::Int32,
2773            true,
2774        ))))
2775        .len(5)
2776        .add_buffer(a_value_offsets)
2777        .add_child_data(a_values.into_data())
2778        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2779        .build()
2780        .unwrap();
2781
2782        // I think this setup is incorrect because this should pass
2783        assert_eq!(a_list_data.null_count(), 1);
2784
2785        let a = LargeListArray::from(a_list_data);
2786        let values = Arc::new(a);
2787
2788        one_column_roundtrip(values, true);
2789    }
2790
2791    #[test]
2792    fn list_nested_nulls() {
2793        use arrow::datatypes::Int32Type;
2794        let data = vec![
2795            Some(vec![Some(1)]),
2796            Some(vec![Some(2), Some(3)]),
2797            None,
2798            Some(vec![Some(4), Some(5), None]),
2799            Some(vec![None]),
2800            Some(vec![Some(6), Some(7)]),
2801        ];
2802
2803        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2804        one_column_roundtrip(Arc::new(list), true);
2805
2806        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2807        one_column_roundtrip(Arc::new(list), true);
2808    }
2809
2810    #[test]
2811    fn struct_single_column() {
2812        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2813        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2814        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2815
2816        let values = Arc::new(s);
2817        one_column_roundtrip(values, false);
2818    }
2819
2820    #[test]
2821    fn list_and_map_coerced_names() {
2822        // Create map and list with non-Parquet naming
2823        let list_field =
2824            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2825        let map_field = Field::new_map(
2826            "my_map",
2827            "entries",
2828            Field::new("keys", DataType::Int32, false),
2829            Field::new("values", DataType::Int32, true),
2830            false,
2831            true,
2832        );
2833
2834        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2835        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2836
2837        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2838
2839        // Write data to Parquet but coerce names to match spec
2840        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2841        let file = tempfile::tempfile().unwrap();
2842        let mut writer =
2843            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2844
2845        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2846        writer.write(&batch).unwrap();
2847        let file_metadata = writer.close().unwrap();
2848
2849        // Coerced name of "item" should be "element"
2850        assert_eq!(file_metadata.schema[3].name, "element");
2851        // Coerced name of "entries" should be "key_value"
2852        assert_eq!(file_metadata.schema[5].name, "key_value");
2853        // Coerced name of "keys" should be "key"
2854        assert_eq!(file_metadata.schema[6].name, "key");
2855        // Coerced name of "values" should be "value"
2856        assert_eq!(file_metadata.schema[7].name, "value");
2857
2858        // Double check schema after reading from the file
2859        let reader = SerializedFileReader::new(file).unwrap();
2860        let file_schema = reader.metadata().file_metadata().schema();
2861        let fields = file_schema.get_fields();
2862        let list_field = &fields[0].get_fields()[0];
2863        assert_eq!(list_field.get_fields()[0].name(), "element");
2864        let map_field = &fields[1].get_fields()[0];
2865        assert_eq!(map_field.name(), "key_value");
2866        assert_eq!(map_field.get_fields()[0].name(), "key");
2867        assert_eq!(map_field.get_fields()[1].name(), "value");
2868    }
2869
2870    #[test]
2871    fn fallback_flush_data_page() {
2872        //tests if the Fallback::flush_data_page clears all buffers correctly
2873        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2874        let values = Arc::new(StringArray::from(raw_values));
2875        let encodings = vec![
2876            Encoding::DELTA_BYTE_ARRAY,
2877            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2878        ];
2879        let data_type = values.data_type().clone();
2880        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2881        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2882
2883        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2884        let data_page_size_limit: usize = 32;
2885        let write_batch_size: usize = 16;
2886
2887        for encoding in &encodings {
2888            for row_group_size in row_group_sizes {
2889                let props = WriterProperties::builder()
2890                    .set_writer_version(WriterVersion::PARQUET_2_0)
2891                    .set_max_row_group_size(row_group_size)
2892                    .set_dictionary_enabled(false)
2893                    .set_encoding(*encoding)
2894                    .set_data_page_size_limit(data_page_size_limit)
2895                    .set_write_batch_size(write_batch_size)
2896                    .build();
2897
2898                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2899                    let string_array_a = StringArray::from(a.clone());
2900                    let string_array_b = StringArray::from(b.clone());
2901                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2902                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2903                    assert_eq!(
2904                        vec_a, vec_b,
2905                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2906                    );
2907                });
2908            }
2909        }
2910    }
2911
2912    #[test]
2913    fn arrow_writer_string_dictionary() {
2914        // define schema
2915        #[allow(deprecated)]
2916        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2917            "dictionary",
2918            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2919            true,
2920            42,
2921            true,
2922        )]));
2923
2924        // create some data
2925        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2926            .iter()
2927            .copied()
2928            .collect();
2929
2930        // build a record batch
2931        one_column_roundtrip_with_schema(Arc::new(d), schema);
2932    }
2933
2934    #[test]
2935    fn arrow_writer_primitive_dictionary() {
2936        // define schema
2937        #[allow(deprecated)]
2938        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2939            "dictionary",
2940            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2941            true,
2942            42,
2943            true,
2944        )]));
2945
2946        // create some data
2947        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2948        builder.append(12345678).unwrap();
2949        builder.append_null();
2950        builder.append(22345678).unwrap();
2951        builder.append(12345678).unwrap();
2952        let d = builder.finish();
2953
2954        one_column_roundtrip_with_schema(Arc::new(d), schema);
2955    }
2956
2957    #[test]
2958    fn arrow_writer_decimal128_dictionary() {
2959        let integers = vec![12345, 56789, 34567];
2960
2961        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2962
2963        let values = Decimal128Array::from(integers.clone())
2964            .with_precision_and_scale(5, 2)
2965            .unwrap();
2966
2967        let array = DictionaryArray::new(keys, Arc::new(values));
2968        one_column_roundtrip(Arc::new(array.clone()), true);
2969
2970        let values = Decimal128Array::from(integers)
2971            .with_precision_and_scale(12, 2)
2972            .unwrap();
2973
2974        let array = array.with_values(Arc::new(values));
2975        one_column_roundtrip(Arc::new(array), true);
2976    }
2977
2978    #[test]
2979    fn arrow_writer_decimal256_dictionary() {
2980        let integers = vec![
2981            i256::from_i128(12345),
2982            i256::from_i128(56789),
2983            i256::from_i128(34567),
2984        ];
2985
2986        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2987
2988        let values = Decimal256Array::from(integers.clone())
2989            .with_precision_and_scale(5, 2)
2990            .unwrap();
2991
2992        let array = DictionaryArray::new(keys, Arc::new(values));
2993        one_column_roundtrip(Arc::new(array.clone()), true);
2994
2995        let values = Decimal256Array::from(integers)
2996            .with_precision_and_scale(12, 2)
2997            .unwrap();
2998
2999        let array = array.with_values(Arc::new(values));
3000        one_column_roundtrip(Arc::new(array), true);
3001    }
3002
3003    #[test]
3004    fn arrow_writer_string_dictionary_unsigned_index() {
3005        // define schema
3006        #[allow(deprecated)]
3007        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3008            "dictionary",
3009            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3010            true,
3011            42,
3012            true,
3013        )]));
3014
3015        // create some data
3016        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3017            .iter()
3018            .copied()
3019            .collect();
3020
3021        one_column_roundtrip_with_schema(Arc::new(d), schema);
3022    }
3023
3024    #[test]
3025    fn u32_min_max() {
3026        // check values roundtrip through parquet
3027        let src = [
3028            u32::MIN,
3029            u32::MIN + 1,
3030            (i32::MAX as u32) - 1,
3031            i32::MAX as u32,
3032            (i32::MAX as u32) + 1,
3033            u32::MAX - 1,
3034            u32::MAX,
3035        ];
3036        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3037        let files = one_column_roundtrip(values, false);
3038
3039        for file in files {
3040            // check statistics are valid
3041            let reader = SerializedFileReader::new(file).unwrap();
3042            let metadata = reader.metadata();
3043
3044            let mut row_offset = 0;
3045            for row_group in metadata.row_groups() {
3046                assert_eq!(row_group.num_columns(), 1);
3047                let column = row_group.column(0);
3048
3049                let num_values = column.num_values() as usize;
3050                let src_slice = &src[row_offset..row_offset + num_values];
3051                row_offset += column.num_values() as usize;
3052
3053                let stats = column.statistics().unwrap();
3054                if let Statistics::Int32(stats) = stats {
3055                    assert_eq!(
3056                        *stats.min_opt().unwrap() as u32,
3057                        *src_slice.iter().min().unwrap()
3058                    );
3059                    assert_eq!(
3060                        *stats.max_opt().unwrap() as u32,
3061                        *src_slice.iter().max().unwrap()
3062                    );
3063                } else {
3064                    panic!("Statistics::Int32 missing")
3065                }
3066            }
3067        }
3068    }
3069
3070    #[test]
3071    fn u64_min_max() {
3072        // check values roundtrip through parquet
3073        let src = [
3074            u64::MIN,
3075            u64::MIN + 1,
3076            (i64::MAX as u64) - 1,
3077            i64::MAX as u64,
3078            (i64::MAX as u64) + 1,
3079            u64::MAX - 1,
3080            u64::MAX,
3081        ];
3082        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3083        let files = one_column_roundtrip(values, false);
3084
3085        for file in files {
3086            // check statistics are valid
3087            let reader = SerializedFileReader::new(file).unwrap();
3088            let metadata = reader.metadata();
3089
3090            let mut row_offset = 0;
3091            for row_group in metadata.row_groups() {
3092                assert_eq!(row_group.num_columns(), 1);
3093                let column = row_group.column(0);
3094
3095                let num_values = column.num_values() as usize;
3096                let src_slice = &src[row_offset..row_offset + num_values];
3097                row_offset += column.num_values() as usize;
3098
3099                let stats = column.statistics().unwrap();
3100                if let Statistics::Int64(stats) = stats {
3101                    assert_eq!(
3102                        *stats.min_opt().unwrap() as u64,
3103                        *src_slice.iter().min().unwrap()
3104                    );
3105                    assert_eq!(
3106                        *stats.max_opt().unwrap() as u64,
3107                        *src_slice.iter().max().unwrap()
3108                    );
3109                } else {
3110                    panic!("Statistics::Int64 missing")
3111                }
3112            }
3113        }
3114    }
3115
3116    #[test]
3117    fn statistics_null_counts_only_nulls() {
3118        // check that null-count statistics for "only NULL"-columns are correct
3119        let values = Arc::new(UInt64Array::from(vec![None, None]));
3120        let files = one_column_roundtrip(values, true);
3121
3122        for file in files {
3123            // check statistics are valid
3124            let reader = SerializedFileReader::new(file).unwrap();
3125            let metadata = reader.metadata();
3126            assert_eq!(metadata.num_row_groups(), 1);
3127            let row_group = metadata.row_group(0);
3128            assert_eq!(row_group.num_columns(), 1);
3129            let column = row_group.column(0);
3130            let stats = column.statistics().unwrap();
3131            assert_eq!(stats.null_count_opt(), Some(2));
3132        }
3133    }
3134
3135    #[test]
3136    fn test_list_of_struct_roundtrip() {
3137        // define schema
3138        let int_field = Field::new("a", DataType::Int32, true);
3139        let int_field2 = Field::new("b", DataType::Int32, true);
3140
3141        let int_builder = Int32Builder::with_capacity(10);
3142        let int_builder2 = Int32Builder::with_capacity(10);
3143
3144        let struct_builder = StructBuilder::new(
3145            vec![int_field, int_field2],
3146            vec![Box::new(int_builder), Box::new(int_builder2)],
3147        );
3148        let mut list_builder = ListBuilder::new(struct_builder);
3149
3150        // Construct the following array
3151        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3152
3153        // [{a: 1, b: 2}]
3154        let values = list_builder.values();
3155        values
3156            .field_builder::<Int32Builder>(0)
3157            .unwrap()
3158            .append_value(1);
3159        values
3160            .field_builder::<Int32Builder>(1)
3161            .unwrap()
3162            .append_value(2);
3163        values.append(true);
3164        list_builder.append(true);
3165
3166        // []
3167        list_builder.append(true);
3168
3169        // null
3170        list_builder.append(false);
3171
3172        // [null, null]
3173        let values = list_builder.values();
3174        values
3175            .field_builder::<Int32Builder>(0)
3176            .unwrap()
3177            .append_null();
3178        values
3179            .field_builder::<Int32Builder>(1)
3180            .unwrap()
3181            .append_null();
3182        values.append(false);
3183        values
3184            .field_builder::<Int32Builder>(0)
3185            .unwrap()
3186            .append_null();
3187        values
3188            .field_builder::<Int32Builder>(1)
3189            .unwrap()
3190            .append_null();
3191        values.append(false);
3192        list_builder.append(true);
3193
3194        // [{a: null, b: 3}]
3195        let values = list_builder.values();
3196        values
3197            .field_builder::<Int32Builder>(0)
3198            .unwrap()
3199            .append_null();
3200        values
3201            .field_builder::<Int32Builder>(1)
3202            .unwrap()
3203            .append_value(3);
3204        values.append(true);
3205        list_builder.append(true);
3206
3207        // [{a: 2, b: null}]
3208        let values = list_builder.values();
3209        values
3210            .field_builder::<Int32Builder>(0)
3211            .unwrap()
3212            .append_value(2);
3213        values
3214            .field_builder::<Int32Builder>(1)
3215            .unwrap()
3216            .append_null();
3217        values.append(true);
3218        list_builder.append(true);
3219
3220        let array = Arc::new(list_builder.finish());
3221
3222        one_column_roundtrip(array, true);
3223    }
3224
3225    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3226        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3227    }
3228
3229    #[test]
3230    fn test_aggregates_records() {
3231        let arrays = [
3232            Int32Array::from((0..100).collect::<Vec<_>>()),
3233            Int32Array::from((0..50).collect::<Vec<_>>()),
3234            Int32Array::from((200..500).collect::<Vec<_>>()),
3235        ];
3236
3237        let schema = Arc::new(Schema::new(vec![Field::new(
3238            "int",
3239            ArrowDataType::Int32,
3240            false,
3241        )]));
3242
3243        let file = tempfile::tempfile().unwrap();
3244
3245        let props = WriterProperties::builder()
3246            .set_max_row_group_size(200)
3247            .build();
3248
3249        let mut writer =
3250            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3251
3252        for array in arrays {
3253            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3254            writer.write(&batch).unwrap();
3255        }
3256
3257        writer.close().unwrap();
3258
3259        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3260        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3261
3262        let batches = builder
3263            .with_batch_size(100)
3264            .build()
3265            .unwrap()
3266            .collect::<ArrowResult<Vec<_>>>()
3267            .unwrap();
3268
3269        assert_eq!(batches.len(), 5);
3270        assert!(batches.iter().all(|x| x.num_columns() == 1));
3271
3272        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3273
3274        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3275
3276        let values: Vec<_> = batches
3277            .iter()
3278            .flat_map(|x| {
3279                x.column(0)
3280                    .as_any()
3281                    .downcast_ref::<Int32Array>()
3282                    .unwrap()
3283                    .values()
3284                    .iter()
3285                    .cloned()
3286            })
3287            .collect();
3288
3289        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3290        assert_eq!(&values, &expected_values)
3291    }
3292
3293    #[test]
3294    fn complex_aggregate() {
3295        // Tests aggregating nested data
3296        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3297        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3298        let struct_a = Arc::new(Field::new(
3299            "struct_a",
3300            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3301            true,
3302        ));
3303
3304        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3305        let struct_b = Arc::new(Field::new(
3306            "struct_b",
3307            DataType::Struct(vec![list_a.clone()].into()),
3308            false,
3309        ));
3310
3311        let schema = Arc::new(Schema::new(vec![struct_b]));
3312
3313        // create nested data
3314        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3315        let field_b_array =
3316            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3317
3318        let struct_a_array = StructArray::from(vec![
3319            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3320            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3321        ]);
3322
3323        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3324            .len(5)
3325            .add_buffer(Buffer::from_iter(vec![
3326                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3327            ]))
3328            .null_bit_buffer(Some(Buffer::from_iter(vec![
3329                true, false, true, false, true,
3330            ])))
3331            .child_data(vec![struct_a_array.into_data()])
3332            .build()
3333            .unwrap();
3334
3335        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3336        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3337
3338        let batch1 =
3339            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3340                .unwrap();
3341
3342        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3343        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3344
3345        let struct_a_array = StructArray::from(vec![
3346            (field_a, Arc::new(field_a_array) as ArrayRef),
3347            (field_b, Arc::new(field_b_array) as ArrayRef),
3348        ]);
3349
3350        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3351            .len(2)
3352            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3353            .child_data(vec![struct_a_array.into_data()])
3354            .build()
3355            .unwrap();
3356
3357        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3358        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3359
3360        let batch2 =
3361            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3362                .unwrap();
3363
3364        let batches = &[batch1, batch2];
3365
3366        // Verify data is as expected
3367
3368        let expected = r#"
3369            +-------------------------------------------------------------------------------------------------------+
3370            | struct_b                                                                                              |
3371            +-------------------------------------------------------------------------------------------------------+
3372            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3373            | {list: }                                                                                              |
3374            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3375            | {list: }                                                                                              |
3376            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3377            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3378            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3379            +-------------------------------------------------------------------------------------------------------+
3380        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3381
3382        let actual = pretty_format_batches(batches).unwrap().to_string();
3383        assert_eq!(actual, expected);
3384
3385        // Write data
3386        let file = tempfile::tempfile().unwrap();
3387        let props = WriterProperties::builder()
3388            .set_max_row_group_size(6)
3389            .build();
3390
3391        let mut writer =
3392            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3393
3394        for batch in batches {
3395            writer.write(batch).unwrap();
3396        }
3397        writer.close().unwrap();
3398
3399        // Read Data
3400        // Should have written entire first batch and first row of second to the first row group
3401        // leaving a single row in the second row group
3402
3403        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3404        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3405
3406        let batches = builder
3407            .with_batch_size(2)
3408            .build()
3409            .unwrap()
3410            .collect::<ArrowResult<Vec<_>>>()
3411            .unwrap();
3412
3413        assert_eq!(batches.len(), 4);
3414        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3415        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3416
3417        let actual = pretty_format_batches(&batches).unwrap().to_string();
3418        assert_eq!(actual, expected);
3419    }
3420
3421    #[test]
3422    fn test_arrow_writer_metadata() {
3423        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3424        let file_schema = batch_schema.clone().with_metadata(
3425            vec![("foo".to_string(), "bar".to_string())]
3426                .into_iter()
3427                .collect(),
3428        );
3429
3430        let batch = RecordBatch::try_new(
3431            Arc::new(batch_schema),
3432            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3433        )
3434        .unwrap();
3435
3436        let mut buf = Vec::with_capacity(1024);
3437        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3438        writer.write(&batch).unwrap();
3439        writer.close().unwrap();
3440    }
3441
3442    #[test]
3443    fn test_arrow_writer_nullable() {
3444        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3445        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3446        let file_schema = Arc::new(file_schema);
3447
3448        let batch = RecordBatch::try_new(
3449            Arc::new(batch_schema),
3450            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3451        )
3452        .unwrap();
3453
3454        let mut buf = Vec::with_capacity(1024);
3455        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3456        writer.write(&batch).unwrap();
3457        writer.close().unwrap();
3458
3459        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3460        let back = read.next().unwrap().unwrap();
3461        assert_eq!(back.schema(), file_schema);
3462        assert_ne!(back.schema(), batch.schema());
3463        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3464    }
3465
3466    #[test]
3467    fn in_progress_accounting() {
3468        // define schema
3469        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3470
3471        // create some data
3472        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3473
3474        // build a record batch
3475        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3476
3477        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3478
3479        // starts empty
3480        assert_eq!(writer.in_progress_size(), 0);
3481        assert_eq!(writer.in_progress_rows(), 0);
3482        assert_eq!(writer.memory_size(), 0);
3483        assert_eq!(writer.bytes_written(), 4); // Initial header
3484        writer.write(&batch).unwrap();
3485
3486        // updated on write
3487        let initial_size = writer.in_progress_size();
3488        assert!(initial_size > 0);
3489        assert_eq!(writer.in_progress_rows(), 5);
3490        let initial_memory = writer.memory_size();
3491        assert!(initial_memory > 0);
3492        // memory estimate is larger than estimated encoded size
3493        assert!(
3494            initial_size <= initial_memory,
3495            "{initial_size} <= {initial_memory}"
3496        );
3497
3498        // updated on second write
3499        writer.write(&batch).unwrap();
3500        assert!(writer.in_progress_size() > initial_size);
3501        assert_eq!(writer.in_progress_rows(), 10);
3502        assert!(writer.memory_size() > initial_memory);
3503        assert!(
3504            writer.in_progress_size() <= writer.memory_size(),
3505            "in_progress_size {} <= memory_size {}",
3506            writer.in_progress_size(),
3507            writer.memory_size()
3508        );
3509
3510        // in progress tracking is cleared, but the overall data written is updated
3511        let pre_flush_bytes_written = writer.bytes_written();
3512        writer.flush().unwrap();
3513        assert_eq!(writer.in_progress_size(), 0);
3514        assert_eq!(writer.memory_size(), 0);
3515        assert!(writer.bytes_written() > pre_flush_bytes_written);
3516
3517        writer.close().unwrap();
3518    }
3519
3520    #[test]
3521    fn test_writer_all_null() {
3522        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3523        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3524        let batch = RecordBatch::try_from_iter(vec![
3525            ("a", Arc::new(a) as ArrayRef),
3526            ("b", Arc::new(b) as ArrayRef),
3527        ])
3528        .unwrap();
3529
3530        let mut buf = Vec::with_capacity(1024);
3531        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3532        writer.write(&batch).unwrap();
3533        writer.close().unwrap();
3534
3535        let bytes = Bytes::from(buf);
3536        let options = ReadOptionsBuilder::new().with_page_index().build();
3537        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3538        let index = reader.metadata().offset_index().unwrap();
3539
3540        assert_eq!(index.len(), 1);
3541        assert_eq!(index[0].len(), 2); // 2 columns
3542        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3543        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3544    }
3545
3546    #[test]
3547    fn test_disabled_statistics_with_page() {
3548        let file_schema = Schema::new(vec![
3549            Field::new("a", DataType::Utf8, true),
3550            Field::new("b", DataType::Utf8, true),
3551        ]);
3552        let file_schema = Arc::new(file_schema);
3553
3554        let batch = RecordBatch::try_new(
3555            file_schema.clone(),
3556            vec![
3557                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3558                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3559            ],
3560        )
3561        .unwrap();
3562
3563        let props = WriterProperties::builder()
3564            .set_statistics_enabled(EnabledStatistics::None)
3565            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3566            .build();
3567
3568        let mut buf = Vec::with_capacity(1024);
3569        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3570        writer.write(&batch).unwrap();
3571
3572        let metadata = writer.close().unwrap();
3573        assert_eq!(metadata.row_groups.len(), 1);
3574        let row_group = &metadata.row_groups[0];
3575        assert_eq!(row_group.columns.len(), 2);
3576        // Column "a" has both offset and column index, as requested
3577        assert!(row_group.columns[0].offset_index_offset.is_some());
3578        assert!(row_group.columns[0].column_index_offset.is_some());
3579        // Column "b" should only have offset index
3580        assert!(row_group.columns[1].offset_index_offset.is_some());
3581        assert!(row_group.columns[1].column_index_offset.is_none());
3582
3583        let options = ReadOptionsBuilder::new().with_page_index().build();
3584        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3585
3586        let row_group = reader.get_row_group(0).unwrap();
3587        let a_col = row_group.metadata().column(0);
3588        let b_col = row_group.metadata().column(1);
3589
3590        // Column chunk of column "a" should have chunk level statistics
3591        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3592            let min = byte_array_stats.min_opt().unwrap();
3593            let max = byte_array_stats.max_opt().unwrap();
3594
3595            assert_eq!(min.as_bytes(), b"a");
3596            assert_eq!(max.as_bytes(), b"d");
3597        } else {
3598            panic!("expecting Statistics::ByteArray");
3599        }
3600
3601        // The column chunk for column "b" shouldn't have statistics
3602        assert!(b_col.statistics().is_none());
3603
3604        let offset_index = reader.metadata().offset_index().unwrap();
3605        assert_eq!(offset_index.len(), 1); // 1 row group
3606        assert_eq!(offset_index[0].len(), 2); // 2 columns
3607
3608        let column_index = reader.metadata().column_index().unwrap();
3609        assert_eq!(column_index.len(), 1); // 1 row group
3610        assert_eq!(column_index[0].len(), 2); // 2 columns
3611
3612        let a_idx = &column_index[0][0];
3613        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3614        let b_idx = &column_index[0][1];
3615        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3616    }
3617
3618    #[test]
3619    fn test_disabled_statistics_with_chunk() {
3620        let file_schema = Schema::new(vec![
3621            Field::new("a", DataType::Utf8, true),
3622            Field::new("b", DataType::Utf8, true),
3623        ]);
3624        let file_schema = Arc::new(file_schema);
3625
3626        let batch = RecordBatch::try_new(
3627            file_schema.clone(),
3628            vec![
3629                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3630                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3631            ],
3632        )
3633        .unwrap();
3634
3635        let props = WriterProperties::builder()
3636            .set_statistics_enabled(EnabledStatistics::None)
3637            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3638            .build();
3639
3640        let mut buf = Vec::with_capacity(1024);
3641        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3642        writer.write(&batch).unwrap();
3643
3644        let metadata = writer.close().unwrap();
3645        assert_eq!(metadata.row_groups.len(), 1);
3646        let row_group = &metadata.row_groups[0];
3647        assert_eq!(row_group.columns.len(), 2);
3648        // Column "a" should only have offset index
3649        assert!(row_group.columns[0].offset_index_offset.is_some());
3650        assert!(row_group.columns[0].column_index_offset.is_none());
3651        // Column "b" should only have offset index
3652        assert!(row_group.columns[1].offset_index_offset.is_some());
3653        assert!(row_group.columns[1].column_index_offset.is_none());
3654
3655        let options = ReadOptionsBuilder::new().with_page_index().build();
3656        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3657
3658        let row_group = reader.get_row_group(0).unwrap();
3659        let a_col = row_group.metadata().column(0);
3660        let b_col = row_group.metadata().column(1);
3661
3662        // Column chunk of column "a" should have chunk level statistics
3663        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3664            let min = byte_array_stats.min_opt().unwrap();
3665            let max = byte_array_stats.max_opt().unwrap();
3666
3667            assert_eq!(min.as_bytes(), b"a");
3668            assert_eq!(max.as_bytes(), b"d");
3669        } else {
3670            panic!("expecting Statistics::ByteArray");
3671        }
3672
3673        // The column chunk for column "b"  shouldn't have statistics
3674        assert!(b_col.statistics().is_none());
3675
3676        let column_index = reader.metadata().column_index().unwrap();
3677        assert_eq!(column_index.len(), 1); // 1 row group
3678        assert_eq!(column_index[0].len(), 2); // 2 columns
3679
3680        let a_idx = &column_index[0][0];
3681        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3682        let b_idx = &column_index[0][1];
3683        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3684    }
3685
3686    #[test]
3687    fn test_arrow_writer_skip_metadata() {
3688        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3689        let file_schema = Arc::new(batch_schema.clone());
3690
3691        let batch = RecordBatch::try_new(
3692            Arc::new(batch_schema),
3693            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3694        )
3695        .unwrap();
3696        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3697
3698        let mut buf = Vec::with_capacity(1024);
3699        let mut writer =
3700            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3701        writer.write(&batch).unwrap();
3702        writer.close().unwrap();
3703
3704        let bytes = Bytes::from(buf);
3705        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3706        assert_eq!(file_schema, *reader_builder.schema());
3707        if let Some(key_value_metadata) = reader_builder
3708            .metadata()
3709            .file_metadata()
3710            .key_value_metadata()
3711        {
3712            assert!(!key_value_metadata
3713                .iter()
3714                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3715        }
3716    }
3717
3718    #[test]
3719    fn mismatched_schemas() {
3720        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3721        let file_schema = Arc::new(Schema::new(vec![Field::new(
3722            "temperature",
3723            DataType::Float64,
3724            false,
3725        )]));
3726
3727        let batch = RecordBatch::try_new(
3728            Arc::new(batch_schema),
3729            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3730        )
3731        .unwrap();
3732
3733        let mut buf = Vec::with_capacity(1024);
3734        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3735
3736        let err = writer.write(&batch).unwrap_err().to_string();
3737        assert_eq!(
3738            err,
3739            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3740        );
3741    }
3742
3743    #[test]
3744    // https://github.com/apache/arrow-rs/issues/6988
3745    fn test_roundtrip_empty_schema() {
3746        // create empty record batch with empty schema
3747        let empty_batch = RecordBatch::try_new_with_options(
3748            Arc::new(Schema::empty()),
3749            vec![],
3750            &RecordBatchOptions::default().with_row_count(Some(0)),
3751        )
3752        .unwrap();
3753
3754        // write to parquet
3755        let mut parquet_bytes: Vec<u8> = Vec::new();
3756        let mut writer =
3757            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3758        writer.write(&empty_batch).unwrap();
3759        writer.close().unwrap();
3760
3761        // read from parquet
3762        let bytes = Bytes::from(parquet_bytes);
3763        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3764        assert_eq!(reader.schema(), &empty_batch.schema());
3765        let batches: Vec<_> = reader
3766            .build()
3767            .unwrap()
3768            .collect::<ArrowResult<Vec<_>>>()
3769            .unwrap();
3770        assert_eq!(batches.len(), 0);
3771    }
3772
3773    #[test]
3774    fn test_page_stats_truncation() {
3775        let string_field = Field::new("a", DataType::Utf8, false);
3776        let binary_field = Field::new("b", DataType::Binary, false);
3777        let schema = Schema::new(vec![string_field, binary_field]);
3778
3779        let raw_string_values = vec!["Blart Versenwald III"];
3780        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
3781        let raw_binary_value_refs = raw_binary_values
3782            .iter()
3783            .map(|x| x.as_slice())
3784            .collect::<Vec<_>>();
3785
3786        let string_values = StringArray::from(raw_string_values.clone());
3787        let binary_values = BinaryArray::from(raw_binary_value_refs);
3788        let batch = RecordBatch::try_new(
3789            Arc::new(schema),
3790            vec![Arc::new(string_values), Arc::new(binary_values)],
3791        )
3792        .unwrap();
3793
3794        let props = WriterProperties::builder()
3795            .set_statistics_truncate_length(Some(2))
3796            .set_dictionary_enabled(false)
3797            .set_encoding(Encoding::PLAIN)
3798            .set_compression(crate::basic::Compression::UNCOMPRESSED)
3799            .build();
3800
3801        let mut file = roundtrip_opts(&batch, props);
3802
3803        // read file and decode page headers
3804        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
3805        let mut buf = vec![];
3806        file.seek(std::io::SeekFrom::Start(0)).unwrap();
3807        let read = file.read_to_end(&mut buf).unwrap();
3808        assert!(read > 0);
3809
3810        // decode first page header
3811        let first_page = &buf[4..];
3812        let mut prot = TCompactSliceInputProtocol::new(first_page);
3813        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3814        let stats = hdr.data_page_header.unwrap().statistics;
3815        assert!(stats.is_some());
3816        let stats = stats.unwrap();
3817        // check that min/max were properly truncated
3818        assert!(!stats.is_max_value_exact.unwrap());
3819        assert!(!stats.is_min_value_exact.unwrap());
3820        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3821        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3822
3823        // check second page now
3824        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
3825        let mut prot = TCompactSliceInputProtocol::new(second_page);
3826        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3827        let stats = hdr.data_page_header.unwrap().statistics;
3828        assert!(stats.is_some());
3829        let stats = stats.unwrap();
3830        // check that min/max were properly truncated
3831        assert!(!stats.is_max_value_exact.unwrap());
3832        assert!(!stats.is_min_value_exact.unwrap());
3833        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3834        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3835    }
3836}