parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
30use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
31
32use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
33
34use crate::arrow::ArrowSchemaConverter;
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
37use crate::column::page_encryption::PageEncryptor;
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40    ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43#[cfg(feature = "encryption")]
44use crate::encryption::encrypt::FileEncryptor;
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use levels::{ArrayLevels, calculate_array_levels};
53
54mod byte_array;
55mod levels;
56
57/// Encodes [`RecordBatch`] to parquet
58///
59/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
60/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
61/// flushed on close, leading the final row group in the output file to potentially
62/// contain fewer than `max_row_group_size` rows
63///
64/// # Example: Writing `RecordBatch`es
65/// ```
66/// # use std::sync::Arc;
67/// # use bytes::Bytes;
68/// # use arrow_array::{ArrayRef, Int64Array};
69/// # use arrow_array::RecordBatch;
70/// # use parquet::arrow::arrow_writer::ArrowWriter;
71/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
72/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
73/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
74///
75/// let mut buffer = Vec::new();
76/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
77/// writer.write(&to_write).unwrap();
78/// writer.close().unwrap();
79///
80/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
81/// let read = reader.next().unwrap().unwrap();
82///
83/// assert_eq!(to_write, read);
84/// ```
85///
86/// # Memory Usage and Limiting
87///
88/// The nature of Parquet requires buffering of an entire row group before it can
89/// be flushed to the underlying writer. Data is mostly buffered in its encoded
90/// form, reducing memory usage. However, some data such as dictionary keys,
91/// large strings or very nested data may still result in non-trivial memory
92/// usage.
93///
94/// See Also:
95/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
96/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
97///
98/// Call [`Self::flush`] to trigger an early flush of a row group based on a
99/// memory threshold and/or global memory pressure. However,  smaller row groups
100/// result in higher metadata overheads, and thus may worsen compression ratios
101/// and query performance.
102///
103/// ```no_run
104/// # use std::io::Write;
105/// # use arrow_array::RecordBatch;
106/// # use parquet::arrow::ArrowWriter;
107/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
108/// # let batch: RecordBatch = todo!();
109/// writer.write(&batch).unwrap();
110/// // Trigger an early flush if anticipated size exceeds 1_000_000
111/// if writer.in_progress_size() > 1_000_000 {
112///     writer.flush().unwrap();
113/// }
114/// ```
115///
116/// ## Type Support
117///
118/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
119/// Parquet types including  [`StructArray`] and [`ListArray`].
120///
121/// The following are not supported:
122///
123/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
124///
125/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
126/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
127/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
128/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
129/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
130///
131/// ## Type Compatibility
132/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
133/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
134/// value type.
135///
136/// For example, the following [`DataType`]s are all logically equivalent and can be written
137/// to the same column:
138/// * String, LargeString, StringView
139/// * Binary, LargeBinary, BinaryView
140///
141/// The writer can will also accept both native and dictionary encoded arrays if the dictionaries
142/// contain compatible values.
143/// ```
144/// # use std::sync::Arc;
145/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch, StringArray, UInt8Array};
146/// # use arrow_schema::{DataType, Field, Schema};
147/// # use parquet::arrow::arrow_writer::ArrowWriter;
148/// let record_batch1 = RecordBatch::try_new(
149///    Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8, false)])),
150///    vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
151///  )
152/// .unwrap();
153///
154/// let mut buffer = Vec::new();
155/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
156/// writer.write(&record_batch1).unwrap();
157///
158/// let record_batch2 = RecordBatch::try_new(
159///     Arc::new(Schema::new(vec![Field::new(
160///         "col",
161///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
162///          false,
163///     )])),
164///     vec![Arc::new(DictionaryArray::new(
165///          UInt8Array::from_iter_values(vec![0, 1]),
166///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
167///      ))],
168///  )
169///  .unwrap();
170///  writer.write(&record_batch2).unwrap();
171///  writer.close();
172/// ```
173pub struct ArrowWriter<W: Write> {
174    /// Underlying Parquet writer
175    writer: SerializedFileWriter<W>,
176
177    /// The in-progress row group if any
178    in_progress: Option<ArrowRowGroupWriter>,
179
180    /// A copy of the Arrow schema.
181    ///
182    /// The schema is used to verify that each record batch written has the correct schema
183    arrow_schema: SchemaRef,
184
185    /// Creates new [`ArrowRowGroupWriter`] instances as required
186    row_group_writer_factory: ArrowRowGroupWriterFactory,
187
188    /// The length of arrays to write to each row group
189    max_row_group_size: usize,
190}
191
192impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        let buffered_memory = self.in_progress_size();
195        f.debug_struct("ArrowWriter")
196            .field("writer", &self.writer)
197            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
198            .field("in_progress_rows", &self.in_progress_rows())
199            .field("arrow_schema", &self.arrow_schema)
200            .field("max_row_group_size", &self.max_row_group_size)
201            .finish()
202    }
203}
204
205impl<W: Write + Send> ArrowWriter<W> {
206    /// Try to create a new Arrow writer
207    ///
208    /// The writer will fail if:
209    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
210    ///  * the Arrow schema contains unsupported datatypes such as Unions
211    pub fn try_new(
212        writer: W,
213        arrow_schema: SchemaRef,
214        props: Option<WriterProperties>,
215    ) -> Result<Self> {
216        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
217        Self::try_new_with_options(writer, arrow_schema, options)
218    }
219
220    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
221    ///
222    /// The writer will fail if:
223    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
224    ///  * the Arrow schema contains unsupported datatypes such as Unions
225    pub fn try_new_with_options(
226        writer: W,
227        arrow_schema: SchemaRef,
228        options: ArrowWriterOptions,
229    ) -> Result<Self> {
230        let mut props = options.properties;
231
232        let schema = if let Some(parquet_schema) = options.schema_descr {
233            parquet_schema.clone()
234        } else {
235            let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
236            if let Some(schema_root) = &options.schema_root {
237                converter = converter.schema_root(schema_root);
238            }
239
240            converter.convert(&arrow_schema)?
241        };
242
243        if !options.skip_arrow_metadata {
244            // add serialized arrow schema
245            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
246        }
247
248        let max_row_group_size = props.max_row_group_size();
249
250        let props_ptr = Arc::new(props);
251        let file_writer =
252            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
253
254        let row_group_writer_factory =
255            ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
256
257        Ok(Self {
258            writer: file_writer,
259            in_progress: None,
260            arrow_schema,
261            row_group_writer_factory,
262            max_row_group_size,
263        })
264    }
265
266    /// Returns metadata for any flushed row groups
267    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
268        self.writer.flushed_row_groups()
269    }
270
271    /// Estimated memory usage, in bytes, of this `ArrowWriter`
272    ///
273    /// This estimate is formed bu summing the values of
274    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
275    pub fn memory_size(&self) -> usize {
276        match &self.in_progress {
277            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
278            None => 0,
279        }
280    }
281
282    /// Anticipated encoded size of the in progress row group.
283    ///
284    /// This estimate the row group size after being completely encoded is,
285    /// formed by summing the values of
286    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
287    /// columns.
288    pub fn in_progress_size(&self) -> usize {
289        match &self.in_progress {
290            Some(in_progress) => in_progress
291                .writers
292                .iter()
293                .map(|x| x.get_estimated_total_bytes())
294                .sum(),
295            None => 0,
296        }
297    }
298
299    /// Returns the number of rows buffered in the in progress row group
300    pub fn in_progress_rows(&self) -> usize {
301        self.in_progress
302            .as_ref()
303            .map(|x| x.buffered_rows)
304            .unwrap_or_default()
305    }
306
307    /// Returns the number of bytes written by this instance
308    pub fn bytes_written(&self) -> usize {
309        self.writer.bytes_written()
310    }
311
312    /// Encodes the provided [`RecordBatch`]
313    ///
314    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
315    /// rows, the contents of `batch` will be written to one or more row groups such that all but
316    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
317    ///
318    /// This will fail if the `batch`'s schema does not match the writer's schema.
319    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
320        if batch.num_rows() == 0 {
321            return Ok(());
322        }
323
324        let in_progress = match &mut self.in_progress {
325            Some(in_progress) => in_progress,
326            x => x.insert(
327                self.row_group_writer_factory
328                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
329            ),
330        };
331
332        // If would exceed max_row_group_size, split batch
333        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
334            let to_write = self.max_row_group_size - in_progress.buffered_rows;
335            let a = batch.slice(0, to_write);
336            let b = batch.slice(to_write, batch.num_rows() - to_write);
337            self.write(&a)?;
338            return self.write(&b);
339        }
340
341        in_progress.write(batch)?;
342
343        if in_progress.buffered_rows >= self.max_row_group_size {
344            self.flush()?
345        }
346        Ok(())
347    }
348
349    /// Writes the given buf bytes to the internal buffer.
350    ///
351    /// It's safe to use this method to write data to the underlying writer,
352    /// because it will ensure that the buffering and byte‐counting layers are used.
353    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
354        self.writer.write_all(buf)
355    }
356
357    /// Flushes all buffered rows into a new row group
358    pub fn flush(&mut self) -> Result<()> {
359        let in_progress = match self.in_progress.take() {
360            Some(in_progress) => in_progress,
361            None => return Ok(()),
362        };
363
364        let mut row_group_writer = self.writer.next_row_group()?;
365        for chunk in in_progress.close()? {
366            chunk.append_to_row_group(&mut row_group_writer)?;
367        }
368        row_group_writer.close()?;
369        Ok(())
370    }
371
372    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
373    ///
374    /// This method provide a way to append kv_metadata after write RecordBatch
375    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
376        self.writer.append_key_value_metadata(kv_metadata)
377    }
378
379    /// Returns a reference to the underlying writer.
380    pub fn inner(&self) -> &W {
381        self.writer.inner()
382    }
383
384    /// Returns a mutable reference to the underlying writer.
385    ///
386    /// **Warning**: if you write directly to this writer, you will skip
387    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
388    /// the file footer’s recorded offsets and sizes to diverge from reality,
389    /// resulting in an unreadable or corrupted Parquet file.
390    ///
391    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
392    pub fn inner_mut(&mut self) -> &mut W {
393        self.writer.inner_mut()
394    }
395
396    /// Flushes any outstanding data and returns the underlying writer.
397    pub fn into_inner(mut self) -> Result<W> {
398        self.flush()?;
399        self.writer.into_inner()
400    }
401
402    /// Close and finalize the underlying Parquet writer
403    ///
404    /// Unlike [`Self::close`] this does not consume self
405    ///
406    /// Attempting to write after calling finish will result in an error
407    pub fn finish(&mut self) -> Result<ParquetMetaData> {
408        self.flush()?;
409        self.writer.finish()
410    }
411
412    /// Close and finalize the underlying Parquet writer
413    pub fn close(mut self) -> Result<ParquetMetaData> {
414        self.finish()
415    }
416
417    /// Create a new row group writer and return its column writers.
418    #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
419    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
420        self.flush()?;
421        let in_progress = self
422            .row_group_writer_factory
423            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
424        Ok(in_progress.writers)
425    }
426
427    /// Append the given column chunks to the file as a new row group.
428    #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
429    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
430        let mut row_group_writer = self.writer.next_row_group()?;
431        for chunk in chunks {
432            chunk.append_to_row_group(&mut row_group_writer)?;
433        }
434        row_group_writer.close()?;
435        Ok(())
436    }
437
438    /// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
439    /// This can be useful to provide more control over how files are written.
440    pub fn into_serialized_writer(
441        mut self,
442    ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
443        self.flush()?;
444        Ok((self.writer, self.row_group_writer_factory))
445    }
446}
447
448impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
449    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
450        self.write(batch).map_err(|e| e.into())
451    }
452
453    fn close(self) -> std::result::Result<(), ArrowError> {
454        self.close()?;
455        Ok(())
456    }
457}
458
459/// Arrow-specific configuration settings for writing parquet files.
460///
461/// See [`ArrowWriter`] for how to configure the writer.
462#[derive(Debug, Clone, Default)]
463pub struct ArrowWriterOptions {
464    properties: WriterProperties,
465    skip_arrow_metadata: bool,
466    schema_root: Option<String>,
467    schema_descr: Option<SchemaDescriptor>,
468}
469
470impl ArrowWriterOptions {
471    /// Creates a new [`ArrowWriterOptions`] with the default settings.
472    pub fn new() -> Self {
473        Self::default()
474    }
475
476    /// Sets the [`WriterProperties`] for writing parquet files.
477    pub fn with_properties(self, properties: WriterProperties) -> Self {
478        Self { properties, ..self }
479    }
480
481    /// Skip encoding the embedded arrow metadata (defaults to `false`)
482    ///
483    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
484    /// by default.
485    ///
486    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
487    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
488        Self {
489            skip_arrow_metadata,
490            ..self
491        }
492    }
493
494    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
495    pub fn with_schema_root(self, schema_root: String) -> Self {
496        Self {
497            schema_root: Some(schema_root),
498            ..self
499        }
500    }
501
502    /// Explicitly specify the Parquet schema to be used
503    ///
504    /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the
505    /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is
506    /// already known or must be calculated using custom logic.
507    pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
508        Self {
509            schema_descr: Some(schema_descr),
510            ..self
511        }
512    }
513}
514
515/// A single column chunk produced by [`ArrowColumnWriter`]
516#[derive(Default)]
517struct ArrowColumnChunkData {
518    length: usize,
519    data: Vec<Bytes>,
520}
521
522impl Length for ArrowColumnChunkData {
523    fn len(&self) -> u64 {
524        self.length as _
525    }
526}
527
528impl ChunkReader for ArrowColumnChunkData {
529    type T = ArrowColumnChunkReader;
530
531    fn get_read(&self, start: u64) -> Result<Self::T> {
532        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
533        Ok(ArrowColumnChunkReader(
534            self.data.clone().into_iter().peekable(),
535        ))
536    }
537
538    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
539        unimplemented!()
540    }
541}
542
543/// A [`Read`] for [`ArrowColumnChunkData`]
544struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
545
546impl Read for ArrowColumnChunkReader {
547    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
548        let buffer = loop {
549            match self.0.peek_mut() {
550                Some(b) if b.is_empty() => {
551                    self.0.next();
552                    continue;
553                }
554                Some(b) => break b,
555                None => return Ok(0),
556            }
557        };
558
559        let len = buffer.len().min(out.len());
560        let b = buffer.split_to(len);
561        out[..len].copy_from_slice(&b);
562        Ok(len)
563    }
564}
565
566/// A shared [`ArrowColumnChunkData`]
567///
568/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
569/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
570type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
571
572#[derive(Default)]
573struct ArrowPageWriter {
574    buffer: SharedColumnChunk,
575    #[cfg(feature = "encryption")]
576    page_encryptor: Option<PageEncryptor>,
577}
578
579impl ArrowPageWriter {
580    #[cfg(feature = "encryption")]
581    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
582        self.page_encryptor = page_encryptor;
583        self
584    }
585
586    #[cfg(feature = "encryption")]
587    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
588        self.page_encryptor.as_mut()
589    }
590
591    #[cfg(not(feature = "encryption"))]
592    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
593        None
594    }
595}
596
597impl PageWriter for ArrowPageWriter {
598    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
599        let page = match self.page_encryptor_mut() {
600            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
601            None => page,
602        };
603
604        let page_header = page.to_thrift_header()?;
605        let header = {
606            let mut header = Vec::with_capacity(1024);
607
608            match self.page_encryptor_mut() {
609                Some(page_encryptor) => {
610                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
611                    if page.compressed_page().is_data_page() {
612                        page_encryptor.increment_page();
613                    }
614                }
615                None => {
616                    let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
617                    page_header.write_thrift(&mut protocol)?;
618                }
619            };
620
621            Bytes::from(header)
622        };
623
624        let mut buf = self.buffer.try_lock().unwrap();
625
626        let data = page.compressed_page().buffer().clone();
627        let compressed_size = data.len() + header.len();
628
629        let mut spec = PageWriteSpec::new();
630        spec.page_type = page.page_type();
631        spec.num_values = page.num_values();
632        spec.uncompressed_size = page.uncompressed_size() + header.len();
633        spec.offset = buf.length as u64;
634        spec.compressed_size = compressed_size;
635        spec.bytes_written = compressed_size as u64;
636
637        buf.length += compressed_size;
638        buf.data.push(header);
639        buf.data.push(data);
640
641        Ok(spec)
642    }
643
644    fn close(&mut self) -> Result<()> {
645        Ok(())
646    }
647}
648
649/// A leaf column that can be encoded by [`ArrowColumnWriter`]
650#[derive(Debug)]
651pub struct ArrowLeafColumn(ArrayLevels);
652
653/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
654///
655/// This function can be used along with [`get_column_writers`] to encode
656/// individual columns in parallel. See example on [`ArrowColumnWriter`]
657pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
658    let levels = calculate_array_levels(array, field)?;
659    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
660}
661
662/// The data for a single column chunk, see [`ArrowColumnWriter`]
663pub struct ArrowColumnChunk {
664    data: ArrowColumnChunkData,
665    close: ColumnCloseResult,
666}
667
668impl std::fmt::Debug for ArrowColumnChunk {
669    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670        f.debug_struct("ArrowColumnChunk")
671            .field("length", &self.data.length)
672            .finish_non_exhaustive()
673    }
674}
675
676impl ArrowColumnChunk {
677    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
678    pub fn append_to_row_group<W: Write + Send>(
679        self,
680        writer: &mut SerializedRowGroupWriter<'_, W>,
681    ) -> Result<()> {
682        writer.append_column(&self.data, self.close)
683    }
684}
685
686/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
687///
688/// Note: This is a low-level interface for applications that require
689/// fine-grained control of encoding (e.g. encoding using multiple threads),
690/// see [`ArrowWriter`] for a higher-level interface
691///
692/// # Example: Encoding two Arrow Array's in Parallel
693/// ```
694/// // The arrow schema
695/// # use std::sync::Arc;
696/// # use arrow_array::*;
697/// # use arrow_schema::*;
698/// # use parquet::arrow::ArrowSchemaConverter;
699/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
700/// # use parquet::file::properties::WriterProperties;
701/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
702/// #
703/// let schema = Arc::new(Schema::new(vec![
704///     Field::new("i32", DataType::Int32, false),
705///     Field::new("f32", DataType::Float32, false),
706/// ]));
707///
708/// // Compute the parquet schema
709/// let props = Arc::new(WriterProperties::default());
710/// let parquet_schema = ArrowSchemaConverter::new()
711///   .with_coerce_types(props.coerce_types())
712///   .convert(&schema)
713///   .unwrap();
714///
715/// // Create writers for each of the leaf columns
716/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
717///
718/// // Spawn a worker thread for each column
719/// //
720/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
721/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
722/// let mut workers: Vec<_> = col_writers
723///     .into_iter()
724///     .map(|mut col_writer| {
725///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
726///         let handle = std::thread::spawn(move || {
727///             // receive Arrays to encode via the channel
728///             for col in recv {
729///                 col_writer.write(&col)?;
730///             }
731///             // once the input is complete, close the writer
732///             // to return the newly created ArrowColumnChunk
733///             col_writer.close()
734///         });
735///         (handle, send)
736///     })
737///     .collect();
738///
739/// // Create parquet writer
740/// let root_schema = parquet_schema.root_schema_ptr();
741/// // write to memory in the example, but this could be a File
742/// let mut out = Vec::with_capacity(1024);
743/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
744///   .unwrap();
745///
746/// // Start row group
747/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
748///   .next_row_group()
749///   .unwrap();
750///
751/// // Create some example input columns to encode
752/// let to_write = vec![
753///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
754///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
755/// ];
756///
757/// // Send the input columns to the workers
758/// let mut worker_iter = workers.iter_mut();
759/// for (arr, field) in to_write.iter().zip(&schema.fields) {
760///     for leaves in compute_leaves(field, arr).unwrap() {
761///         worker_iter.next().unwrap().1.send(leaves).unwrap();
762///     }
763/// }
764///
765/// // Wait for the workers to complete encoding, and append
766/// // the resulting column chunks to the row group (and the file)
767/// for (handle, send) in workers {
768///     drop(send); // Drop send side to signal termination
769///     // wait for the worker to send the completed chunk
770///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
771///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
772/// }
773/// // Close the row group which writes to the underlying file
774/// row_group_writer.close().unwrap();
775///
776/// let metadata = writer.close().unwrap();
777/// assert_eq!(metadata.file_metadata().num_rows(), 3);
778/// ```
779pub struct ArrowColumnWriter {
780    writer: ArrowColumnWriterImpl,
781    chunk: SharedColumnChunk,
782}
783
784impl std::fmt::Debug for ArrowColumnWriter {
785    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
786        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
787    }
788}
789
790enum ArrowColumnWriterImpl {
791    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
792    Column(ColumnWriter<'static>),
793}
794
795impl ArrowColumnWriter {
796    /// Write an [`ArrowLeafColumn`]
797    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
798        match &mut self.writer {
799            ArrowColumnWriterImpl::Column(c) => {
800                write_leaf(c, &col.0)?;
801            }
802            ArrowColumnWriterImpl::ByteArray(c) => {
803                write_primitive(c, col.0.array().as_ref(), &col.0)?;
804            }
805        }
806        Ok(())
807    }
808
809    /// Close this column returning the written [`ArrowColumnChunk`]
810    pub fn close(self) -> Result<ArrowColumnChunk> {
811        let close = match self.writer {
812            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
813            ArrowColumnWriterImpl::Column(c) => c.close()?,
814        };
815        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
816        let data = chunk.into_inner().unwrap();
817        Ok(ArrowColumnChunk { data, close })
818    }
819
820    /// Returns the estimated total memory usage by the writer.
821    ///
822    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
823    /// of the current memory usage and not it's anticipated encoded size.
824    ///
825    /// This includes:
826    /// 1. Data buffered in encoded form
827    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
828    ///
829    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
830    pub fn memory_size(&self) -> usize {
831        match &self.writer {
832            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
833            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
834        }
835    }
836
837    /// Returns the estimated total encoded bytes for this column writer.
838    ///
839    /// This includes:
840    /// 1. Data buffered in encoded form
841    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
842    ///
843    /// This value should be less than or equal to [`Self::memory_size`]
844    pub fn get_estimated_total_bytes(&self) -> usize {
845        match &self.writer {
846            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
847            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
848        }
849    }
850}
851
852/// Encodes [`RecordBatch`] to a parquet row group
853struct ArrowRowGroupWriter {
854    writers: Vec<ArrowColumnWriter>,
855    schema: SchemaRef,
856    buffered_rows: usize,
857}
858
859impl ArrowRowGroupWriter {
860    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
861        Self {
862            writers,
863            schema: arrow.clone(),
864            buffered_rows: 0,
865        }
866    }
867
868    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
869        self.buffered_rows += batch.num_rows();
870        let mut writers = self.writers.iter_mut();
871        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
872            for leaf in compute_leaves(field.as_ref(), column)? {
873                writers.next().unwrap().write(&leaf)?
874            }
875        }
876        Ok(())
877    }
878
879    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
880        self.writers
881            .into_iter()
882            .map(|writer| writer.close())
883            .collect()
884    }
885}
886
887/// Factory that creates new column writers for each row group in the Parquet file.
888pub struct ArrowRowGroupWriterFactory {
889    schema: SchemaDescriptor,
890    arrow_schema: SchemaRef,
891    props: WriterPropertiesPtr,
892    #[cfg(feature = "encryption")]
893    file_encryptor: Option<Arc<FileEncryptor>>,
894}
895
896impl ArrowRowGroupWriterFactory {
897    #[cfg(feature = "encryption")]
898    fn new<W: Write + Send>(
899        file_writer: &SerializedFileWriter<W>,
900        schema: SchemaDescriptor,
901        arrow_schema: SchemaRef,
902        props: WriterPropertiesPtr,
903    ) -> Self {
904        Self {
905            schema,
906            arrow_schema,
907            props,
908            file_encryptor: file_writer.file_encryptor(),
909        }
910    }
911
912    #[cfg(not(feature = "encryption"))]
913    fn new<W: Write + Send>(
914        _file_writer: &SerializedFileWriter<W>,
915        schema: SchemaDescriptor,
916        arrow_schema: SchemaRef,
917        props: WriterPropertiesPtr,
918    ) -> Self {
919        Self {
920            schema,
921            arrow_schema,
922            props,
923        }
924    }
925
926    #[cfg(feature = "encryption")]
927    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
928        let writers = get_column_writers_with_encryptor(
929            &self.schema,
930            &self.props,
931            &self.arrow_schema,
932            self.file_encryptor.clone(),
933            row_group_index,
934        )?;
935        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
936    }
937
938    #[cfg(not(feature = "encryption"))]
939    fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
940        let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
941        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
942    }
943
944    /// Create column writers for a new row group.
945    pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
946        let rg_writer = self.create_row_group_writer(row_group_index)?;
947        Ok(rg_writer.writers)
948    }
949}
950
951/// Returns [`ArrowColumnWriter`]s for each column in a given schema
952pub fn get_column_writers(
953    parquet: &SchemaDescriptor,
954    props: &WriterPropertiesPtr,
955    arrow: &SchemaRef,
956) -> Result<Vec<ArrowColumnWriter>> {
957    let mut writers = Vec::with_capacity(arrow.fields.len());
958    let mut leaves = parquet.columns().iter();
959    let column_factory = ArrowColumnWriterFactory::new();
960    for field in &arrow.fields {
961        column_factory.get_arrow_column_writer(
962            field.data_type(),
963            props,
964            &mut leaves,
965            &mut writers,
966        )?;
967    }
968    Ok(writers)
969}
970
971/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
972#[cfg(feature = "encryption")]
973fn get_column_writers_with_encryptor(
974    parquet: &SchemaDescriptor,
975    props: &WriterPropertiesPtr,
976    arrow: &SchemaRef,
977    file_encryptor: Option<Arc<FileEncryptor>>,
978    row_group_index: usize,
979) -> Result<Vec<ArrowColumnWriter>> {
980    let mut writers = Vec::with_capacity(arrow.fields.len());
981    let mut leaves = parquet.columns().iter();
982    let column_factory =
983        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
984    for field in &arrow.fields {
985        column_factory.get_arrow_column_writer(
986            field.data_type(),
987            props,
988            &mut leaves,
989            &mut writers,
990        )?;
991    }
992    Ok(writers)
993}
994
995/// Creates [`ArrowColumnWriter`] instances
996struct ArrowColumnWriterFactory {
997    #[cfg(feature = "encryption")]
998    row_group_index: usize,
999    #[cfg(feature = "encryption")]
1000    file_encryptor: Option<Arc<FileEncryptor>>,
1001}
1002
1003impl ArrowColumnWriterFactory {
1004    pub fn new() -> Self {
1005        Self {
1006            #[cfg(feature = "encryption")]
1007            row_group_index: 0,
1008            #[cfg(feature = "encryption")]
1009            file_encryptor: None,
1010        }
1011    }
1012
1013    #[cfg(feature = "encryption")]
1014    pub fn with_file_encryptor(
1015        mut self,
1016        row_group_index: usize,
1017        file_encryptor: Option<Arc<FileEncryptor>>,
1018    ) -> Self {
1019        self.row_group_index = row_group_index;
1020        self.file_encryptor = file_encryptor;
1021        self
1022    }
1023
1024    #[cfg(feature = "encryption")]
1025    fn create_page_writer(
1026        &self,
1027        column_descriptor: &ColumnDescPtr,
1028        column_index: usize,
1029    ) -> Result<Box<ArrowPageWriter>> {
1030        let column_path = column_descriptor.path().string();
1031        let page_encryptor = PageEncryptor::create_if_column_encrypted(
1032            &self.file_encryptor,
1033            self.row_group_index,
1034            column_index,
1035            &column_path,
1036        )?;
1037        Ok(Box::new(
1038            ArrowPageWriter::default().with_encryptor(page_encryptor),
1039        ))
1040    }
1041
1042    #[cfg(not(feature = "encryption"))]
1043    fn create_page_writer(
1044        &self,
1045        _column_descriptor: &ColumnDescPtr,
1046        _column_index: usize,
1047    ) -> Result<Box<ArrowPageWriter>> {
1048        Ok(Box::<ArrowPageWriter>::default())
1049    }
1050
1051    /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
1052    /// output ColumnDesc to `leaves` and the column writers to `out`
1053    fn get_arrow_column_writer(
1054        &self,
1055        data_type: &ArrowDataType,
1056        props: &WriterPropertiesPtr,
1057        leaves: &mut Iter<'_, ColumnDescPtr>,
1058        out: &mut Vec<ArrowColumnWriter>,
1059    ) -> Result<()> {
1060        // Instantiate writers for normal columns
1061        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1062            let page_writer = self.create_page_writer(desc, out.len())?;
1063            let chunk = page_writer.buffer.clone();
1064            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1065            Ok(ArrowColumnWriter {
1066                chunk,
1067                writer: ArrowColumnWriterImpl::Column(writer),
1068            })
1069        };
1070
1071        // Instantiate writers for byte arrays (e.g. Utf8,  Binary, etc)
1072        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1073            let page_writer = self.create_page_writer(desc, out.len())?;
1074            let chunk = page_writer.buffer.clone();
1075            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1076            Ok(ArrowColumnWriter {
1077                chunk,
1078                writer: ArrowColumnWriterImpl::ByteArray(writer),
1079            })
1080        };
1081
1082        match data_type {
1083            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1084            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1085                out.push(col(leaves.next().unwrap())?)
1086            }
1087            ArrowDataType::LargeBinary
1088            | ArrowDataType::Binary
1089            | ArrowDataType::Utf8
1090            | ArrowDataType::LargeUtf8
1091            | ArrowDataType::BinaryView
1092            | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1093            ArrowDataType::List(f)
1094            | ArrowDataType::LargeList(f)
1095            | ArrowDataType::FixedSizeList(f, _) => {
1096                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1097            }
1098            ArrowDataType::Struct(fields) => {
1099                for field in fields {
1100                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1101                }
1102            }
1103            ArrowDataType::Map(f, _) => match f.data_type() {
1104                ArrowDataType::Struct(f) => {
1105                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1106                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1107                }
1108                _ => unreachable!("invalid map type"),
1109            },
1110            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1111                ArrowDataType::Utf8
1112                | ArrowDataType::LargeUtf8
1113                | ArrowDataType::Binary
1114                | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1115                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1116                    out.push(bytes(leaves.next().unwrap())?)
1117                }
1118                ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1119                _ => out.push(col(leaves.next().unwrap())?),
1120            },
1121            _ => {
1122                return Err(ParquetError::NYI(format!(
1123                    "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1124                )));
1125            }
1126        }
1127        Ok(())
1128    }
1129}
1130
1131fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1132    let column = levels.array().as_ref();
1133    let indices = levels.non_null_indices();
1134    match writer {
1135        ColumnWriter::Int32ColumnWriter(typed) => {
1136            match column.data_type() {
1137                ArrowDataType::Date64 => {
1138                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1139                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1140                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1141
1142                    let array = array.as_primitive::<Int32Type>();
1143                    write_primitive(typed, array.values(), levels)
1144                }
1145                ArrowDataType::UInt32 => {
1146                    let values = column.as_primitive::<UInt32Type>().values();
1147                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1148                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1149                    let array = values.inner().typed_data::<i32>();
1150                    write_primitive(typed, array, levels)
1151                }
1152                ArrowDataType::Decimal32(_, _) => {
1153                    let array = column
1154                        .as_primitive::<Decimal32Type>()
1155                        .unary::<_, Int32Type>(|v| v);
1156                    write_primitive(typed, array.values(), levels)
1157                }
1158                ArrowDataType::Decimal64(_, _) => {
1159                    // use the int32 to represent the decimal with low precision
1160                    let array = column
1161                        .as_primitive::<Decimal64Type>()
1162                        .unary::<_, Int32Type>(|v| v as i32);
1163                    write_primitive(typed, array.values(), levels)
1164                }
1165                ArrowDataType::Decimal128(_, _) => {
1166                    // use the int32 to represent the decimal with low precision
1167                    let array = column
1168                        .as_primitive::<Decimal128Type>()
1169                        .unary::<_, Int32Type>(|v| v as i32);
1170                    write_primitive(typed, array.values(), levels)
1171                }
1172                ArrowDataType::Decimal256(_, _) => {
1173                    // use the int32 to represent the decimal with low precision
1174                    let array = column
1175                        .as_primitive::<Decimal256Type>()
1176                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1177                    write_primitive(typed, array.values(), levels)
1178                }
1179                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1180                    ArrowDataType::Decimal32(_, _) => {
1181                        let array = arrow_cast::cast(column, value_type)?;
1182                        let array = array
1183                            .as_primitive::<Decimal32Type>()
1184                            .unary::<_, Int32Type>(|v| v);
1185                        write_primitive(typed, array.values(), levels)
1186                    }
1187                    ArrowDataType::Decimal64(_, _) => {
1188                        let array = arrow_cast::cast(column, value_type)?;
1189                        let array = array
1190                            .as_primitive::<Decimal64Type>()
1191                            .unary::<_, Int32Type>(|v| v as i32);
1192                        write_primitive(typed, array.values(), levels)
1193                    }
1194                    ArrowDataType::Decimal128(_, _) => {
1195                        let array = arrow_cast::cast(column, value_type)?;
1196                        let array = array
1197                            .as_primitive::<Decimal128Type>()
1198                            .unary::<_, Int32Type>(|v| v as i32);
1199                        write_primitive(typed, array.values(), levels)
1200                    }
1201                    ArrowDataType::Decimal256(_, _) => {
1202                        let array = arrow_cast::cast(column, value_type)?;
1203                        let array = array
1204                            .as_primitive::<Decimal256Type>()
1205                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1206                        write_primitive(typed, array.values(), levels)
1207                    }
1208                    _ => {
1209                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1210                        let array = array.as_primitive::<Int32Type>();
1211                        write_primitive(typed, array.values(), levels)
1212                    }
1213                },
1214                _ => {
1215                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1216                    let array = array.as_primitive::<Int32Type>();
1217                    write_primitive(typed, array.values(), levels)
1218                }
1219            }
1220        }
1221        ColumnWriter::BoolColumnWriter(typed) => {
1222            let array = column.as_boolean();
1223            typed.write_batch(
1224                get_bool_array_slice(array, indices).as_slice(),
1225                levels.def_levels(),
1226                levels.rep_levels(),
1227            )
1228        }
1229        ColumnWriter::Int64ColumnWriter(typed) => {
1230            match column.data_type() {
1231                ArrowDataType::Date64 => {
1232                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1233
1234                    let array = array.as_primitive::<Int64Type>();
1235                    write_primitive(typed, array.values(), levels)
1236                }
1237                ArrowDataType::Int64 => {
1238                    let array = column.as_primitive::<Int64Type>();
1239                    write_primitive(typed, array.values(), levels)
1240                }
1241                ArrowDataType::UInt64 => {
1242                    let values = column.as_primitive::<UInt64Type>().values();
1243                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1244                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1245                    let array = values.inner().typed_data::<i64>();
1246                    write_primitive(typed, array, levels)
1247                }
1248                ArrowDataType::Decimal64(_, _) => {
1249                    let array = column
1250                        .as_primitive::<Decimal64Type>()
1251                        .unary::<_, Int64Type>(|v| v);
1252                    write_primitive(typed, array.values(), levels)
1253                }
1254                ArrowDataType::Decimal128(_, _) => {
1255                    // use the int64 to represent the decimal with low precision
1256                    let array = column
1257                        .as_primitive::<Decimal128Type>()
1258                        .unary::<_, Int64Type>(|v| v as i64);
1259                    write_primitive(typed, array.values(), levels)
1260                }
1261                ArrowDataType::Decimal256(_, _) => {
1262                    // use the int64 to represent the decimal with low precision
1263                    let array = column
1264                        .as_primitive::<Decimal256Type>()
1265                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1266                    write_primitive(typed, array.values(), levels)
1267                }
1268                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1269                    ArrowDataType::Decimal64(_, _) => {
1270                        let array = arrow_cast::cast(column, value_type)?;
1271                        let array = array
1272                            .as_primitive::<Decimal64Type>()
1273                            .unary::<_, Int64Type>(|v| v);
1274                        write_primitive(typed, array.values(), levels)
1275                    }
1276                    ArrowDataType::Decimal128(_, _) => {
1277                        let array = arrow_cast::cast(column, value_type)?;
1278                        let array = array
1279                            .as_primitive::<Decimal128Type>()
1280                            .unary::<_, Int64Type>(|v| v as i64);
1281                        write_primitive(typed, array.values(), levels)
1282                    }
1283                    ArrowDataType::Decimal256(_, _) => {
1284                        let array = arrow_cast::cast(column, value_type)?;
1285                        let array = array
1286                            .as_primitive::<Decimal256Type>()
1287                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1288                        write_primitive(typed, array.values(), levels)
1289                    }
1290                    _ => {
1291                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1292                        let array = array.as_primitive::<Int64Type>();
1293                        write_primitive(typed, array.values(), levels)
1294                    }
1295                },
1296                _ => {
1297                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1298                    let array = array.as_primitive::<Int64Type>();
1299                    write_primitive(typed, array.values(), levels)
1300                }
1301            }
1302        }
1303        ColumnWriter::Int96ColumnWriter(_typed) => {
1304            unreachable!("Currently unreachable because data type not supported")
1305        }
1306        ColumnWriter::FloatColumnWriter(typed) => {
1307            let array = column.as_primitive::<Float32Type>();
1308            write_primitive(typed, array.values(), levels)
1309        }
1310        ColumnWriter::DoubleColumnWriter(typed) => {
1311            let array = column.as_primitive::<Float64Type>();
1312            write_primitive(typed, array.values(), levels)
1313        }
1314        ColumnWriter::ByteArrayColumnWriter(_) => {
1315            unreachable!("should use ByteArrayWriter")
1316        }
1317        ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1318            let bytes = match column.data_type() {
1319                ArrowDataType::Interval(interval_unit) => match interval_unit {
1320                    IntervalUnit::YearMonth => {
1321                        let array = column
1322                            .as_any()
1323                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1324                            .unwrap();
1325                        get_interval_ym_array_slice(array, indices)
1326                    }
1327                    IntervalUnit::DayTime => {
1328                        let array = column
1329                            .as_any()
1330                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1331                            .unwrap();
1332                        get_interval_dt_array_slice(array, indices)
1333                    }
1334                    _ => {
1335                        return Err(ParquetError::NYI(format!(
1336                            "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1337                        )));
1338                    }
1339                },
1340                ArrowDataType::FixedSizeBinary(_) => {
1341                    let array = column
1342                        .as_any()
1343                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1344                        .unwrap();
1345                    get_fsb_array_slice(array, indices)
1346                }
1347                ArrowDataType::Decimal32(_, _) => {
1348                    let array = column.as_primitive::<Decimal32Type>();
1349                    get_decimal_32_array_slice(array, indices)
1350                }
1351                ArrowDataType::Decimal64(_, _) => {
1352                    let array = column.as_primitive::<Decimal64Type>();
1353                    get_decimal_64_array_slice(array, indices)
1354                }
1355                ArrowDataType::Decimal128(_, _) => {
1356                    let array = column.as_primitive::<Decimal128Type>();
1357                    get_decimal_128_array_slice(array, indices)
1358                }
1359                ArrowDataType::Decimal256(_, _) => {
1360                    let array = column
1361                        .as_any()
1362                        .downcast_ref::<arrow_array::Decimal256Array>()
1363                        .unwrap();
1364                    get_decimal_256_array_slice(array, indices)
1365                }
1366                ArrowDataType::Float16 => {
1367                    let array = column.as_primitive::<Float16Type>();
1368                    get_float_16_array_slice(array, indices)
1369                }
1370                _ => {
1371                    return Err(ParquetError::NYI(
1372                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1373                    ));
1374                }
1375            };
1376            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1377        }
1378    }
1379}
1380
1381fn write_primitive<E: ColumnValueEncoder>(
1382    writer: &mut GenericColumnWriter<E>,
1383    values: &E::Values,
1384    levels: &ArrayLevels,
1385) -> Result<usize> {
1386    writer.write_batch_internal(
1387        values,
1388        Some(levels.non_null_indices()),
1389        levels.def_levels(),
1390        levels.rep_levels(),
1391        None,
1392        None,
1393        None,
1394    )
1395}
1396
1397fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1398    let mut values = Vec::with_capacity(indices.len());
1399    for i in indices {
1400        values.push(array.value(*i))
1401    }
1402    values
1403}
1404
1405/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1406/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1407fn get_interval_ym_array_slice(
1408    array: &arrow_array::IntervalYearMonthArray,
1409    indices: &[usize],
1410) -> Vec<FixedLenByteArray> {
1411    let mut values = Vec::with_capacity(indices.len());
1412    for i in indices {
1413        let mut value = array.value(*i).to_le_bytes().to_vec();
1414        let mut suffix = vec![0; 8];
1415        value.append(&mut suffix);
1416        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1417    }
1418    values
1419}
1420
1421/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1422/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1423fn get_interval_dt_array_slice(
1424    array: &arrow_array::IntervalDayTimeArray,
1425    indices: &[usize],
1426) -> Vec<FixedLenByteArray> {
1427    let mut values = Vec::with_capacity(indices.len());
1428    for i in indices {
1429        let mut out = [0; 12];
1430        let value = array.value(*i);
1431        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1432        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1433        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1434    }
1435    values
1436}
1437
1438fn get_decimal_32_array_slice(
1439    array: &arrow_array::Decimal32Array,
1440    indices: &[usize],
1441) -> Vec<FixedLenByteArray> {
1442    let mut values = Vec::with_capacity(indices.len());
1443    let size = decimal_length_from_precision(array.precision());
1444    for i in indices {
1445        let as_be_bytes = array.value(*i).to_be_bytes();
1446        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1447        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1448    }
1449    values
1450}
1451
1452fn get_decimal_64_array_slice(
1453    array: &arrow_array::Decimal64Array,
1454    indices: &[usize],
1455) -> Vec<FixedLenByteArray> {
1456    let mut values = Vec::with_capacity(indices.len());
1457    let size = decimal_length_from_precision(array.precision());
1458    for i in indices {
1459        let as_be_bytes = array.value(*i).to_be_bytes();
1460        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1461        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1462    }
1463    values
1464}
1465
1466fn get_decimal_128_array_slice(
1467    array: &arrow_array::Decimal128Array,
1468    indices: &[usize],
1469) -> Vec<FixedLenByteArray> {
1470    let mut values = Vec::with_capacity(indices.len());
1471    let size = decimal_length_from_precision(array.precision());
1472    for i in indices {
1473        let as_be_bytes = array.value(*i).to_be_bytes();
1474        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1475        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1476    }
1477    values
1478}
1479
1480fn get_decimal_256_array_slice(
1481    array: &arrow_array::Decimal256Array,
1482    indices: &[usize],
1483) -> Vec<FixedLenByteArray> {
1484    let mut values = Vec::with_capacity(indices.len());
1485    let size = decimal_length_from_precision(array.precision());
1486    for i in indices {
1487        let as_be_bytes = array.value(*i).to_be_bytes();
1488        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1489        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1490    }
1491    values
1492}
1493
1494fn get_float_16_array_slice(
1495    array: &arrow_array::Float16Array,
1496    indices: &[usize],
1497) -> Vec<FixedLenByteArray> {
1498    let mut values = Vec::with_capacity(indices.len());
1499    for i in indices {
1500        let value = array.value(*i).to_le_bytes().to_vec();
1501        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1502    }
1503    values
1504}
1505
1506fn get_fsb_array_slice(
1507    array: &arrow_array::FixedSizeBinaryArray,
1508    indices: &[usize],
1509) -> Vec<FixedLenByteArray> {
1510    let mut values = Vec::with_capacity(indices.len());
1511    for i in indices {
1512        let value = array.value(*i).to_vec();
1513        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1514    }
1515    values
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520    use super::*;
1521
1522    use std::fs::File;
1523
1524    use crate::arrow::ARROW_SCHEMA_META_KEY;
1525    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1526    use crate::column::page::{Page, PageReader};
1527    use crate::file::metadata::thrift_gen::PageHeader;
1528    use crate::file::page_index::column_index::ColumnIndexMetaData;
1529    use crate::file::reader::SerializedPageReader;
1530    use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1531    use crate::schema::types::{ColumnPath, Type};
1532    use arrow::datatypes::ToByteSlice;
1533    use arrow::datatypes::{DataType, Schema};
1534    use arrow::error::Result as ArrowResult;
1535    use arrow::util::data_gen::create_random_array;
1536    use arrow::util::pretty::pretty_format_batches;
1537    use arrow::{array::*, buffer::Buffer};
1538    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, i256};
1539    use arrow_schema::Fields;
1540    use half::f16;
1541    use num_traits::{FromPrimitive, ToPrimitive};
1542    use tempfile::tempfile;
1543
1544    use crate::basic::Encoding;
1545    use crate::data_type::AsBytes;
1546    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1547    use crate::file::properties::{
1548        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1549    };
1550    use crate::file::serialized_reader::ReadOptionsBuilder;
1551    use crate::file::{
1552        reader::{FileReader, SerializedFileReader},
1553        statistics::Statistics,
1554    };
1555
1556    #[test]
1557    fn arrow_writer() {
1558        // define schema
1559        let schema = Schema::new(vec![
1560            Field::new("a", DataType::Int32, false),
1561            Field::new("b", DataType::Int32, true),
1562        ]);
1563
1564        // create some data
1565        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1566        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1567
1568        // build a record batch
1569        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1570
1571        roundtrip(batch, Some(SMALL_SIZE / 2));
1572    }
1573
1574    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1575        let mut buffer = vec![];
1576
1577        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1578        writer.write(expected_batch).unwrap();
1579        writer.close().unwrap();
1580
1581        buffer
1582    }
1583
1584    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1585        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1586        writer.write(expected_batch).unwrap();
1587        writer.into_inner().unwrap()
1588    }
1589
1590    #[test]
1591    fn roundtrip_bytes() {
1592        // define schema
1593        let schema = Arc::new(Schema::new(vec![
1594            Field::new("a", DataType::Int32, false),
1595            Field::new("b", DataType::Int32, true),
1596        ]));
1597
1598        // create some data
1599        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1600        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1601
1602        // build a record batch
1603        let expected_batch =
1604            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1605
1606        for buffer in [
1607            get_bytes_after_close(schema.clone(), &expected_batch),
1608            get_bytes_by_into_inner(schema, &expected_batch),
1609        ] {
1610            let cursor = Bytes::from(buffer);
1611            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1612
1613            let actual_batch = record_batch_reader
1614                .next()
1615                .expect("No batch found")
1616                .expect("Unable to get batch");
1617
1618            assert_eq!(expected_batch.schema(), actual_batch.schema());
1619            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1620            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1621            for i in 0..expected_batch.num_columns() {
1622                let expected_data = expected_batch.column(i).to_data();
1623                let actual_data = actual_batch.column(i).to_data();
1624
1625                assert_eq!(expected_data, actual_data);
1626            }
1627        }
1628    }
1629
1630    #[test]
1631    fn arrow_writer_non_null() {
1632        // define schema
1633        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1634
1635        // create some data
1636        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1637
1638        // build a record batch
1639        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1640
1641        roundtrip(batch, Some(SMALL_SIZE / 2));
1642    }
1643
1644    #[test]
1645    fn arrow_writer_list() {
1646        // define schema
1647        let schema = Schema::new(vec![Field::new(
1648            "a",
1649            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1650            true,
1651        )]);
1652
1653        // create some data
1654        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1655
1656        // Construct a buffer for value offsets, for the nested array:
1657        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1658        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1659
1660        // Construct a list array from the above two
1661        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1662            DataType::Int32,
1663            false,
1664        ))))
1665        .len(5)
1666        .add_buffer(a_value_offsets)
1667        .add_child_data(a_values.into_data())
1668        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1669        .build()
1670        .unwrap();
1671        let a = ListArray::from(a_list_data);
1672
1673        // build a record batch
1674        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1675
1676        assert_eq!(batch.column(0).null_count(), 1);
1677
1678        // This test fails if the max row group size is less than the batch's length
1679        // see https://github.com/apache/arrow-rs/issues/518
1680        roundtrip(batch, None);
1681    }
1682
1683    #[test]
1684    fn arrow_writer_list_non_null() {
1685        // define schema
1686        let schema = Schema::new(vec![Field::new(
1687            "a",
1688            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1689            false,
1690        )]);
1691
1692        // create some data
1693        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1694
1695        // Construct a buffer for value offsets, for the nested array:
1696        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1697        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1698
1699        // Construct a list array from the above two
1700        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1701            DataType::Int32,
1702            false,
1703        ))))
1704        .len(5)
1705        .add_buffer(a_value_offsets)
1706        .add_child_data(a_values.into_data())
1707        .build()
1708        .unwrap();
1709        let a = ListArray::from(a_list_data);
1710
1711        // build a record batch
1712        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1713
1714        // This test fails if the max row group size is less than the batch's length
1715        // see https://github.com/apache/arrow-rs/issues/518
1716        assert_eq!(batch.column(0).null_count(), 0);
1717
1718        roundtrip(batch, None);
1719    }
1720
1721    #[test]
1722    fn arrow_writer_binary() {
1723        let string_field = Field::new("a", DataType::Utf8, false);
1724        let binary_field = Field::new("b", DataType::Binary, false);
1725        let schema = Schema::new(vec![string_field, binary_field]);
1726
1727        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1728        let raw_binary_values = [
1729            b"foo".to_vec(),
1730            b"bar".to_vec(),
1731            b"baz".to_vec(),
1732            b"quux".to_vec(),
1733        ];
1734        let raw_binary_value_refs = raw_binary_values
1735            .iter()
1736            .map(|x| x.as_slice())
1737            .collect::<Vec<_>>();
1738
1739        let string_values = StringArray::from(raw_string_values.clone());
1740        let binary_values = BinaryArray::from(raw_binary_value_refs);
1741        let batch = RecordBatch::try_new(
1742            Arc::new(schema),
1743            vec![Arc::new(string_values), Arc::new(binary_values)],
1744        )
1745        .unwrap();
1746
1747        roundtrip(batch, Some(SMALL_SIZE / 2));
1748    }
1749
1750    #[test]
1751    fn arrow_writer_binary_view() {
1752        let string_field = Field::new("a", DataType::Utf8View, false);
1753        let binary_field = Field::new("b", DataType::BinaryView, false);
1754        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1755        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1756
1757        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1758        let raw_binary_values = vec![
1759            b"foo".to_vec(),
1760            b"bar".to_vec(),
1761            b"large payload over 12 bytes".to_vec(),
1762            b"lulu".to_vec(),
1763        ];
1764        let nullable_string_values =
1765            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1766
1767        let string_view_values = StringViewArray::from(raw_string_values);
1768        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1769        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1770        let batch = RecordBatch::try_new(
1771            Arc::new(schema),
1772            vec![
1773                Arc::new(string_view_values),
1774                Arc::new(binary_view_values),
1775                Arc::new(nullable_string_view_values),
1776            ],
1777        )
1778        .unwrap();
1779
1780        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1781        roundtrip(batch, None);
1782    }
1783
1784    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1785        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1786        let schema = Schema::new(vec![decimal_field]);
1787
1788        let decimal_values = vec![10_000, 50_000, 0, -100]
1789            .into_iter()
1790            .map(Some)
1791            .collect::<Decimal128Array>()
1792            .with_precision_and_scale(precision, scale)
1793            .unwrap();
1794
1795        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1796    }
1797
1798    #[test]
1799    fn arrow_writer_decimal() {
1800        // int32 to store the decimal value
1801        let batch_int32_decimal = get_decimal_batch(5, 2);
1802        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1803        // int64 to store the decimal value
1804        let batch_int64_decimal = get_decimal_batch(12, 2);
1805        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1806        // fixed_length_byte_array to store the decimal value
1807        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1808        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1809    }
1810
1811    #[test]
1812    fn arrow_writer_complex() {
1813        // define schema
1814        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1815        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1816        let struct_field_g = Arc::new(Field::new_list(
1817            "g",
1818            Field::new_list_field(DataType::Int16, true),
1819            false,
1820        ));
1821        let struct_field_h = Arc::new(Field::new_list(
1822            "h",
1823            Field::new_list_field(DataType::Int16, false),
1824            true,
1825        ));
1826        let struct_field_e = Arc::new(Field::new_struct(
1827            "e",
1828            vec![
1829                struct_field_f.clone(),
1830                struct_field_g.clone(),
1831                struct_field_h.clone(),
1832            ],
1833            false,
1834        ));
1835        let schema = Schema::new(vec![
1836            Field::new("a", DataType::Int32, false),
1837            Field::new("b", DataType::Int32, true),
1838            Field::new_struct(
1839                "c",
1840                vec![struct_field_d.clone(), struct_field_e.clone()],
1841                false,
1842            ),
1843        ]);
1844
1845        // create some data
1846        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1847        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1848        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1849        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1850
1851        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1852
1853        // Construct a buffer for value offsets, for the nested array:
1854        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1855        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1856
1857        // Construct a list array from the above two
1858        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1859            .len(5)
1860            .add_buffer(g_value_offsets.clone())
1861            .add_child_data(g_value.to_data())
1862            .build()
1863            .unwrap();
1864        let g = ListArray::from(g_list_data);
1865        // The difference between g and h is that h has a null bitmap
1866        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1867            .len(5)
1868            .add_buffer(g_value_offsets)
1869            .add_child_data(g_value.to_data())
1870            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1871            .build()
1872            .unwrap();
1873        let h = ListArray::from(h_list_data);
1874
1875        let e = StructArray::from(vec![
1876            (struct_field_f, Arc::new(f) as ArrayRef),
1877            (struct_field_g, Arc::new(g) as ArrayRef),
1878            (struct_field_h, Arc::new(h) as ArrayRef),
1879        ]);
1880
1881        let c = StructArray::from(vec![
1882            (struct_field_d, Arc::new(d) as ArrayRef),
1883            (struct_field_e, Arc::new(e) as ArrayRef),
1884        ]);
1885
1886        // build a record batch
1887        let batch = RecordBatch::try_new(
1888            Arc::new(schema),
1889            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1890        )
1891        .unwrap();
1892
1893        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1894        roundtrip(batch, Some(SMALL_SIZE / 3));
1895    }
1896
1897    #[test]
1898    fn arrow_writer_complex_mixed() {
1899        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1900        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1901
1902        // define schema
1903        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1904        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1905        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1906        let schema = Schema::new(vec![Field::new(
1907            "some_nested_object",
1908            DataType::Struct(Fields::from(vec![
1909                offset_field.clone(),
1910                partition_field.clone(),
1911                topic_field.clone(),
1912            ])),
1913            false,
1914        )]);
1915
1916        // create some data
1917        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1918        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1919        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1920
1921        let some_nested_object = StructArray::from(vec![
1922            (offset_field, Arc::new(offset) as ArrayRef),
1923            (partition_field, Arc::new(partition) as ArrayRef),
1924            (topic_field, Arc::new(topic) as ArrayRef),
1925        ]);
1926
1927        // build a record batch
1928        let batch =
1929            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1930
1931        roundtrip(batch, Some(SMALL_SIZE / 2));
1932    }
1933
1934    #[test]
1935    fn arrow_writer_map() {
1936        // Note: we are using the JSON Arrow reader for brevity
1937        let json_content = r#"
1938        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1939        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1940        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1941        "#;
1942        let entries_struct_type = DataType::Struct(Fields::from(vec![
1943            Field::new("key", DataType::Utf8, false),
1944            Field::new("value", DataType::Utf8, true),
1945        ]));
1946        let stocks_field = Field::new(
1947            "stocks",
1948            DataType::Map(
1949                Arc::new(Field::new("entries", entries_struct_type, false)),
1950                false,
1951            ),
1952            true,
1953        );
1954        let schema = Arc::new(Schema::new(vec![stocks_field]));
1955        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1956        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1957
1958        let batch = reader.next().unwrap().unwrap();
1959        roundtrip(batch, None);
1960    }
1961
1962    #[test]
1963    fn arrow_writer_2_level_struct() {
1964        // tests writing <struct<struct<primitive>>
1965        let field_c = Field::new("c", DataType::Int32, true);
1966        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1967        let type_a = DataType::Struct(vec![field_b.clone()].into());
1968        let field_a = Field::new("a", type_a, true);
1969        let schema = Schema::new(vec![field_a.clone()]);
1970
1971        // create data
1972        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1973        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1974            .len(6)
1975            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1976            .add_child_data(c.into_data())
1977            .build()
1978            .unwrap();
1979        let b = StructArray::from(b_data);
1980        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1981            .len(6)
1982            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1983            .add_child_data(b.into_data())
1984            .build()
1985            .unwrap();
1986        let a = StructArray::from(a_data);
1987
1988        assert_eq!(a.null_count(), 1);
1989        assert_eq!(a.column(0).null_count(), 2);
1990
1991        // build a racord batch
1992        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1993
1994        roundtrip(batch, Some(SMALL_SIZE / 2));
1995    }
1996
1997    #[test]
1998    fn arrow_writer_2_level_struct_non_null() {
1999        // tests writing <struct<struct<primitive>>
2000        let field_c = Field::new("c", DataType::Int32, false);
2001        let type_b = DataType::Struct(vec![field_c].into());
2002        let field_b = Field::new("b", type_b.clone(), false);
2003        let type_a = DataType::Struct(vec![field_b].into());
2004        let field_a = Field::new("a", type_a.clone(), false);
2005        let schema = Schema::new(vec![field_a]);
2006
2007        // create data
2008        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2009        let b_data = ArrayDataBuilder::new(type_b)
2010            .len(6)
2011            .add_child_data(c.into_data())
2012            .build()
2013            .unwrap();
2014        let b = StructArray::from(b_data);
2015        let a_data = ArrayDataBuilder::new(type_a)
2016            .len(6)
2017            .add_child_data(b.into_data())
2018            .build()
2019            .unwrap();
2020        let a = StructArray::from(a_data);
2021
2022        assert_eq!(a.null_count(), 0);
2023        assert_eq!(a.column(0).null_count(), 0);
2024
2025        // build a racord batch
2026        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2027
2028        roundtrip(batch, Some(SMALL_SIZE / 2));
2029    }
2030
2031    #[test]
2032    fn arrow_writer_2_level_struct_mixed_null() {
2033        // tests writing <struct<struct<primitive>>
2034        let field_c = Field::new("c", DataType::Int32, false);
2035        let type_b = DataType::Struct(vec![field_c].into());
2036        let field_b = Field::new("b", type_b.clone(), true);
2037        let type_a = DataType::Struct(vec![field_b].into());
2038        let field_a = Field::new("a", type_a.clone(), false);
2039        let schema = Schema::new(vec![field_a]);
2040
2041        // create data
2042        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2043        let b_data = ArrayDataBuilder::new(type_b)
2044            .len(6)
2045            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2046            .add_child_data(c.into_data())
2047            .build()
2048            .unwrap();
2049        let b = StructArray::from(b_data);
2050        // a intentionally has no null buffer, to test that this is handled correctly
2051        let a_data = ArrayDataBuilder::new(type_a)
2052            .len(6)
2053            .add_child_data(b.into_data())
2054            .build()
2055            .unwrap();
2056        let a = StructArray::from(a_data);
2057
2058        assert_eq!(a.null_count(), 0);
2059        assert_eq!(a.column(0).null_count(), 2);
2060
2061        // build a racord batch
2062        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2063
2064        roundtrip(batch, Some(SMALL_SIZE / 2));
2065    }
2066
2067    #[test]
2068    fn arrow_writer_2_level_struct_mixed_null_2() {
2069        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2070        let field_c = Field::new("c", DataType::Int32, false);
2071        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2072        let field_e = Field::new(
2073            "e",
2074            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2075            false,
2076        );
2077
2078        let field_b = Field::new(
2079            "b",
2080            DataType::Struct(vec![field_c, field_d, field_e].into()),
2081            false,
2082        );
2083        let type_a = DataType::Struct(vec![field_b.clone()].into());
2084        let field_a = Field::new("a", type_a, true);
2085        let schema = Schema::new(vec![field_a.clone()]);
2086
2087        // create data
2088        let c = Int32Array::from_iter_values(0..6);
2089        let d = FixedSizeBinaryArray::try_from_iter(
2090            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2091        )
2092        .expect("four byte values");
2093        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2094        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2095            .len(6)
2096            .add_child_data(c.into_data())
2097            .add_child_data(d.into_data())
2098            .add_child_data(e.into_data())
2099            .build()
2100            .unwrap();
2101        let b = StructArray::from(b_data);
2102        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2103            .len(6)
2104            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2105            .add_child_data(b.into_data())
2106            .build()
2107            .unwrap();
2108        let a = StructArray::from(a_data);
2109
2110        assert_eq!(a.null_count(), 3);
2111        assert_eq!(a.column(0).null_count(), 0);
2112
2113        // build a record batch
2114        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2115
2116        roundtrip(batch, Some(SMALL_SIZE / 2));
2117    }
2118
2119    #[test]
2120    fn test_fixed_size_binary_in_dict() {
2121        fn test_fixed_size_binary_in_dict_inner<K>()
2122        where
2123            K: ArrowDictionaryKeyType,
2124            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2125            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2126        {
2127            let field = Field::new(
2128                "a",
2129                DataType::Dictionary(
2130                    Box::new(K::DATA_TYPE),
2131                    Box::new(DataType::FixedSizeBinary(4)),
2132                ),
2133                false,
2134            );
2135            let schema = Schema::new(vec![field]);
2136
2137            let keys: Vec<K::Native> = vec![
2138                K::Native::try_from(0u8).unwrap(),
2139                K::Native::try_from(0u8).unwrap(),
2140                K::Native::try_from(1u8).unwrap(),
2141            ];
2142            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2143            let values = FixedSizeBinaryArray::try_from_iter(
2144                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2145            )
2146            .unwrap();
2147
2148            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2149            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2150            roundtrip(batch, None);
2151        }
2152
2153        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2154        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2155        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2156        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2157        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2158        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2159        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2160        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2161    }
2162
2163    #[test]
2164    fn test_empty_dict() {
2165        let struct_fields = Fields::from(vec![Field::new(
2166            "dict",
2167            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2168            false,
2169        )]);
2170
2171        let schema = Schema::new(vec![Field::new_struct(
2172            "struct",
2173            struct_fields.clone(),
2174            true,
2175        )]);
2176        let dictionary = Arc::new(DictionaryArray::new(
2177            Int32Array::new_null(5),
2178            Arc::new(StringArray::new_null(0)),
2179        ));
2180
2181        let s = StructArray::new(
2182            struct_fields,
2183            vec![dictionary],
2184            Some(NullBuffer::new_null(5)),
2185        );
2186
2187        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2188        roundtrip(batch, None);
2189    }
2190    #[test]
2191    fn arrow_writer_page_size() {
2192        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2193
2194        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2195
2196        // Generate an array of 10 unique 10 character string
2197        for i in 0..10 {
2198            let value = i
2199                .to_string()
2200                .repeat(10)
2201                .chars()
2202                .take(10)
2203                .collect::<String>();
2204
2205            builder.append_value(value);
2206        }
2207
2208        let array = Arc::new(builder.finish());
2209
2210        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2211
2212        let file = tempfile::tempfile().unwrap();
2213
2214        // Set everything very low so we fallback to PLAIN encoding after the first row
2215        let props = WriterProperties::builder()
2216            .set_data_page_size_limit(1)
2217            .set_dictionary_page_size_limit(1)
2218            .set_write_batch_size(1)
2219            .build();
2220
2221        let mut writer =
2222            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2223                .expect("Unable to write file");
2224        writer.write(&batch).unwrap();
2225        writer.close().unwrap();
2226
2227        let options = ReadOptionsBuilder::new().with_page_index().build();
2228        let reader =
2229            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2230
2231        let column = reader.metadata().row_group(0).columns();
2232
2233        assert_eq!(column.len(), 1);
2234
2235        // We should write one row before falling back to PLAIN encoding so there should still be a
2236        // dictionary page.
2237        assert!(
2238            column[0].dictionary_page_offset().is_some(),
2239            "Expected a dictionary page"
2240        );
2241
2242        assert!(reader.metadata().offset_index().is_some());
2243        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2244
2245        let page_locations = offset_indexes[0].page_locations.clone();
2246
2247        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2248        // so we expect one dictionary encoded page and then a page per row thereafter.
2249        assert_eq!(
2250            page_locations.len(),
2251            10,
2252            "Expected 10 pages but got {page_locations:#?}"
2253        );
2254    }
2255
2256    #[test]
2257    fn arrow_writer_float_nans() {
2258        let f16_field = Field::new("a", DataType::Float16, false);
2259        let f32_field = Field::new("b", DataType::Float32, false);
2260        let f64_field = Field::new("c", DataType::Float64, false);
2261        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2262
2263        let f16_values = (0..MEDIUM_SIZE)
2264            .map(|i| {
2265                Some(if i % 2 == 0 {
2266                    f16::NAN
2267                } else {
2268                    f16::from_f32(i as f32)
2269                })
2270            })
2271            .collect::<Float16Array>();
2272
2273        let f32_values = (0..MEDIUM_SIZE)
2274            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2275            .collect::<Float32Array>();
2276
2277        let f64_values = (0..MEDIUM_SIZE)
2278            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2279            .collect::<Float64Array>();
2280
2281        let batch = RecordBatch::try_new(
2282            Arc::new(schema),
2283            vec![
2284                Arc::new(f16_values),
2285                Arc::new(f32_values),
2286                Arc::new(f64_values),
2287            ],
2288        )
2289        .unwrap();
2290
2291        roundtrip(batch, None);
2292    }
2293
2294    const SMALL_SIZE: usize = 7;
2295    const MEDIUM_SIZE: usize = 63;
2296
2297    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2298        let mut files = vec![];
2299        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2300            let mut props = WriterProperties::builder().set_writer_version(version);
2301
2302            if let Some(size) = max_row_group_size {
2303                props = props.set_max_row_group_size(size)
2304            }
2305
2306            let props = props.build();
2307            files.push(roundtrip_opts(&expected_batch, props))
2308        }
2309        files
2310    }
2311
2312    // Round trip the specified record batch with the specified writer properties,
2313    // to an in-memory file, and validate the arrays using the specified function.
2314    // Returns the in-memory file.
2315    fn roundtrip_opts_with_array_validation<F>(
2316        expected_batch: &RecordBatch,
2317        props: WriterProperties,
2318        validate: F,
2319    ) -> Bytes
2320    where
2321        F: Fn(&ArrayData, &ArrayData),
2322    {
2323        let mut file = vec![];
2324
2325        let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2326            .expect("Unable to write file");
2327        writer.write(expected_batch).unwrap();
2328        writer.close().unwrap();
2329
2330        let file = Bytes::from(file);
2331        let mut record_batch_reader =
2332            ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2333
2334        let actual_batch = record_batch_reader
2335            .next()
2336            .expect("No batch found")
2337            .expect("Unable to get batch");
2338
2339        assert_eq!(expected_batch.schema(), actual_batch.schema());
2340        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2341        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2342        for i in 0..expected_batch.num_columns() {
2343            let expected_data = expected_batch.column(i).to_data();
2344            let actual_data = actual_batch.column(i).to_data();
2345            validate(&expected_data, &actual_data);
2346        }
2347
2348        file
2349    }
2350
2351    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2352        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2353            a.validate_full().expect("valid expected data");
2354            b.validate_full().expect("valid actual data");
2355            assert_eq!(a, b)
2356        })
2357    }
2358
2359    struct RoundTripOptions {
2360        values: ArrayRef,
2361        schema: SchemaRef,
2362        bloom_filter: bool,
2363        bloom_filter_position: BloomFilterPosition,
2364    }
2365
2366    impl RoundTripOptions {
2367        fn new(values: ArrayRef, nullable: bool) -> Self {
2368            let data_type = values.data_type().clone();
2369            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2370            Self {
2371                values,
2372                schema: Arc::new(schema),
2373                bloom_filter: false,
2374                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2375            }
2376        }
2377    }
2378
2379    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2380        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2381    }
2382
2383    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2384        let mut options = RoundTripOptions::new(values, false);
2385        options.schema = schema;
2386        one_column_roundtrip_with_options(options)
2387    }
2388
2389    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2390        let RoundTripOptions {
2391            values,
2392            schema,
2393            bloom_filter,
2394            bloom_filter_position,
2395        } = options;
2396
2397        let encodings = match values.data_type() {
2398            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2399                vec![
2400                    Encoding::PLAIN,
2401                    Encoding::DELTA_BYTE_ARRAY,
2402                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2403                ]
2404            }
2405            DataType::Int64
2406            | DataType::Int32
2407            | DataType::Int16
2408            | DataType::Int8
2409            | DataType::UInt64
2410            | DataType::UInt32
2411            | DataType::UInt16
2412            | DataType::UInt8 => vec![
2413                Encoding::PLAIN,
2414                Encoding::DELTA_BINARY_PACKED,
2415                Encoding::BYTE_STREAM_SPLIT,
2416            ],
2417            DataType::Float32 | DataType::Float64 => {
2418                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2419            }
2420            _ => vec![Encoding::PLAIN],
2421        };
2422
2423        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2424
2425        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2426
2427        let mut files = vec![];
2428        for dictionary_size in [0, 1, 1024] {
2429            for encoding in &encodings {
2430                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2431                    for row_group_size in row_group_sizes {
2432                        let props = WriterProperties::builder()
2433                            .set_writer_version(version)
2434                            .set_max_row_group_size(row_group_size)
2435                            .set_dictionary_enabled(dictionary_size != 0)
2436                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2437                            .set_encoding(*encoding)
2438                            .set_bloom_filter_enabled(bloom_filter)
2439                            .set_bloom_filter_position(bloom_filter_position)
2440                            .build();
2441
2442                        files.push(roundtrip_opts(&expected_batch, props))
2443                    }
2444                }
2445            }
2446        }
2447        files
2448    }
2449
2450    fn values_required<A, I>(iter: I) -> Vec<Bytes>
2451    where
2452        A: From<Vec<I::Item>> + Array + 'static,
2453        I: IntoIterator,
2454    {
2455        let raw_values: Vec<_> = iter.into_iter().collect();
2456        let values = Arc::new(A::from(raw_values));
2457        one_column_roundtrip(values, false)
2458    }
2459
2460    fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2461    where
2462        A: From<Vec<Option<I::Item>>> + Array + 'static,
2463        I: IntoIterator,
2464    {
2465        let optional_raw_values: Vec<_> = iter
2466            .into_iter()
2467            .enumerate()
2468            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2469            .collect();
2470        let optional_values = Arc::new(A::from(optional_raw_values));
2471        one_column_roundtrip(optional_values, true)
2472    }
2473
2474    fn required_and_optional<A, I>(iter: I)
2475    where
2476        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2477        I: IntoIterator + Clone,
2478    {
2479        values_required::<A, I>(iter.clone());
2480        values_optional::<A, I>(iter);
2481    }
2482
2483    fn check_bloom_filter<T: AsBytes>(
2484        files: Vec<Bytes>,
2485        file_column: String,
2486        positive_values: Vec<T>,
2487        negative_values: Vec<T>,
2488    ) {
2489        files.into_iter().take(1).for_each(|file| {
2490            let file_reader = SerializedFileReader::new_with_options(
2491                file,
2492                ReadOptionsBuilder::new()
2493                    .with_reader_properties(
2494                        ReaderProperties::builder()
2495                            .set_read_bloom_filter(true)
2496                            .build(),
2497                    )
2498                    .build(),
2499            )
2500            .expect("Unable to open file as Parquet");
2501            let metadata = file_reader.metadata();
2502
2503            // Gets bloom filters from all row groups.
2504            let mut bloom_filters: Vec<_> = vec![];
2505            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2506                if let Some((column_index, _)) = row_group
2507                    .columns()
2508                    .iter()
2509                    .enumerate()
2510                    .find(|(_, column)| column.column_path().string() == file_column)
2511                {
2512                    let row_group_reader = file_reader
2513                        .get_row_group(ri)
2514                        .expect("Unable to read row group");
2515                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2516                        bloom_filters.push(sbbf.clone());
2517                    } else {
2518                        panic!("No bloom filter for column named {file_column} found");
2519                    }
2520                } else {
2521                    panic!("No column named {file_column} found");
2522                }
2523            }
2524
2525            positive_values.iter().for_each(|value| {
2526                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2527                assert!(
2528                    found.is_some(),
2529                    "{}",
2530                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2531                );
2532            });
2533
2534            negative_values.iter().for_each(|value| {
2535                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2536                assert!(
2537                    found.is_none(),
2538                    "{}",
2539                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2540                );
2541            });
2542        });
2543    }
2544
2545    #[test]
2546    fn all_null_primitive_single_column() {
2547        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2548        one_column_roundtrip(values, true);
2549    }
2550    #[test]
2551    fn null_single_column() {
2552        let values = Arc::new(NullArray::new(SMALL_SIZE));
2553        one_column_roundtrip(values, true);
2554        // null arrays are always nullable, a test with non-nullable nulls fails
2555    }
2556
2557    #[test]
2558    fn bool_single_column() {
2559        required_and_optional::<BooleanArray, _>(
2560            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2561        );
2562    }
2563
2564    #[test]
2565    fn bool_large_single_column() {
2566        let values = Arc::new(
2567            [None, Some(true), Some(false)]
2568                .iter()
2569                .cycle()
2570                .copied()
2571                .take(200_000)
2572                .collect::<BooleanArray>(),
2573        );
2574        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2575        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2576        let file = tempfile::tempfile().unwrap();
2577
2578        let mut writer =
2579            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2580                .expect("Unable to write file");
2581        writer.write(&expected_batch).unwrap();
2582        writer.close().unwrap();
2583    }
2584
2585    #[test]
2586    fn check_page_offset_index_with_nan() {
2587        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2588        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2589        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2590
2591        let mut out = Vec::with_capacity(1024);
2592        let mut writer =
2593            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2594        writer.write(&batch).unwrap();
2595        let file_meta_data = writer.close().unwrap();
2596        for row_group in file_meta_data.row_groups() {
2597            for column in row_group.columns() {
2598                assert!(column.offset_index_offset().is_some());
2599                assert!(column.offset_index_length().is_some());
2600                assert!(column.column_index_offset().is_none());
2601                assert!(column.column_index_length().is_none());
2602            }
2603        }
2604    }
2605
2606    #[test]
2607    fn i8_single_column() {
2608        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2609    }
2610
2611    #[test]
2612    fn i16_single_column() {
2613        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2614    }
2615
2616    #[test]
2617    fn i32_single_column() {
2618        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2619    }
2620
2621    #[test]
2622    fn i64_single_column() {
2623        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2624    }
2625
2626    #[test]
2627    fn u8_single_column() {
2628        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2629    }
2630
2631    #[test]
2632    fn u16_single_column() {
2633        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2634    }
2635
2636    #[test]
2637    fn u32_single_column() {
2638        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2639    }
2640
2641    #[test]
2642    fn u64_single_column() {
2643        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2644    }
2645
2646    #[test]
2647    fn f32_single_column() {
2648        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2649    }
2650
2651    #[test]
2652    fn f64_single_column() {
2653        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2654    }
2655
2656    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2657    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2658    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2659
2660    #[test]
2661    fn timestamp_second_single_column() {
2662        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2663        let values = Arc::new(TimestampSecondArray::from(raw_values));
2664
2665        one_column_roundtrip(values, false);
2666    }
2667
2668    #[test]
2669    fn timestamp_millisecond_single_column() {
2670        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2671        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2672
2673        one_column_roundtrip(values, false);
2674    }
2675
2676    #[test]
2677    fn timestamp_microsecond_single_column() {
2678        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2679        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2680
2681        one_column_roundtrip(values, false);
2682    }
2683
2684    #[test]
2685    fn timestamp_nanosecond_single_column() {
2686        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2687        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2688
2689        one_column_roundtrip(values, false);
2690    }
2691
2692    #[test]
2693    fn date32_single_column() {
2694        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2695    }
2696
2697    #[test]
2698    fn date64_single_column() {
2699        // Date64 must be a multiple of 86400000, see ARROW-10925
2700        required_and_optional::<Date64Array, _>(
2701            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2702        );
2703    }
2704
2705    #[test]
2706    fn time32_second_single_column() {
2707        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2708    }
2709
2710    #[test]
2711    fn time32_millisecond_single_column() {
2712        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2713    }
2714
2715    #[test]
2716    fn time64_microsecond_single_column() {
2717        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2718    }
2719
2720    #[test]
2721    fn time64_nanosecond_single_column() {
2722        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2723    }
2724
2725    #[test]
2726    fn duration_second_single_column() {
2727        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2728    }
2729
2730    #[test]
2731    fn duration_millisecond_single_column() {
2732        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2733    }
2734
2735    #[test]
2736    fn duration_microsecond_single_column() {
2737        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2738    }
2739
2740    #[test]
2741    fn duration_nanosecond_single_column() {
2742        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2743    }
2744
2745    #[test]
2746    fn interval_year_month_single_column() {
2747        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2748    }
2749
2750    #[test]
2751    fn interval_day_time_single_column() {
2752        required_and_optional::<IntervalDayTimeArray, _>(vec![
2753            IntervalDayTime::new(0, 1),
2754            IntervalDayTime::new(0, 3),
2755            IntervalDayTime::new(3, -2),
2756            IntervalDayTime::new(-200, 4),
2757        ]);
2758    }
2759
2760    #[test]
2761    #[should_panic(
2762        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2763    )]
2764    fn interval_month_day_nano_single_column() {
2765        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2766            IntervalMonthDayNano::new(0, 1, 5),
2767            IntervalMonthDayNano::new(0, 3, 2),
2768            IntervalMonthDayNano::new(3, -2, -5),
2769            IntervalMonthDayNano::new(-200, 4, -1),
2770        ]);
2771    }
2772
2773    #[test]
2774    fn binary_single_column() {
2775        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2776        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2777        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2778
2779        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2780        values_required::<BinaryArray, _>(many_vecs_iter);
2781    }
2782
2783    #[test]
2784    fn binary_view_single_column() {
2785        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2786        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2787        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2788
2789        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2790        values_required::<BinaryViewArray, _>(many_vecs_iter);
2791    }
2792
2793    #[test]
2794    fn i32_column_bloom_filter_at_end() {
2795        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2796        let mut options = RoundTripOptions::new(array, false);
2797        options.bloom_filter = true;
2798        options.bloom_filter_position = BloomFilterPosition::End;
2799
2800        let files = one_column_roundtrip_with_options(options);
2801        check_bloom_filter(
2802            files,
2803            "col".to_string(),
2804            (0..SMALL_SIZE as i32).collect(),
2805            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2806        );
2807    }
2808
2809    #[test]
2810    fn i32_column_bloom_filter() {
2811        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2812        let mut options = RoundTripOptions::new(array, false);
2813        options.bloom_filter = true;
2814
2815        let files = one_column_roundtrip_with_options(options);
2816        check_bloom_filter(
2817            files,
2818            "col".to_string(),
2819            (0..SMALL_SIZE as i32).collect(),
2820            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2821        );
2822    }
2823
2824    #[test]
2825    fn binary_column_bloom_filter() {
2826        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2827        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2828        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2829
2830        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2831        let mut options = RoundTripOptions::new(array, false);
2832        options.bloom_filter = true;
2833
2834        let files = one_column_roundtrip_with_options(options);
2835        check_bloom_filter(
2836            files,
2837            "col".to_string(),
2838            many_vecs,
2839            vec![vec![(SMALL_SIZE + 1) as u8]],
2840        );
2841    }
2842
2843    #[test]
2844    fn empty_string_null_column_bloom_filter() {
2845        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2846        let raw_strs = raw_values.iter().map(|s| s.as_str());
2847
2848        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2849        let mut options = RoundTripOptions::new(array, false);
2850        options.bloom_filter = true;
2851
2852        let files = one_column_roundtrip_with_options(options);
2853
2854        let optional_raw_values: Vec<_> = raw_values
2855            .iter()
2856            .enumerate()
2857            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2858            .collect();
2859        // For null slots, empty string should not be in bloom filter.
2860        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2861    }
2862
2863    #[test]
2864    fn large_binary_single_column() {
2865        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2866        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2867        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2868
2869        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2870        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2871    }
2872
2873    #[test]
2874    fn fixed_size_binary_single_column() {
2875        let mut builder = FixedSizeBinaryBuilder::new(4);
2876        builder.append_value(b"0123").unwrap();
2877        builder.append_null();
2878        builder.append_value(b"8910").unwrap();
2879        builder.append_value(b"1112").unwrap();
2880        let array = Arc::new(builder.finish());
2881
2882        one_column_roundtrip(array, true);
2883    }
2884
2885    #[test]
2886    fn string_single_column() {
2887        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2888        let raw_strs = raw_values.iter().map(|s| s.as_str());
2889
2890        required_and_optional::<StringArray, _>(raw_strs);
2891    }
2892
2893    #[test]
2894    fn large_string_single_column() {
2895        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2896        let raw_strs = raw_values.iter().map(|s| s.as_str());
2897
2898        required_and_optional::<LargeStringArray, _>(raw_strs);
2899    }
2900
2901    #[test]
2902    fn string_view_single_column() {
2903        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2904        let raw_strs = raw_values.iter().map(|s| s.as_str());
2905
2906        required_and_optional::<StringViewArray, _>(raw_strs);
2907    }
2908
2909    #[test]
2910    fn null_list_single_column() {
2911        let null_field = Field::new_list_field(DataType::Null, true);
2912        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2913
2914        let schema = Schema::new(vec![list_field]);
2915
2916        // Build [[], null, [null, null]]
2917        let a_values = NullArray::new(2);
2918        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2919        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2920            DataType::Null,
2921            true,
2922        ))))
2923        .len(3)
2924        .add_buffer(a_value_offsets)
2925        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2926        .add_child_data(a_values.into_data())
2927        .build()
2928        .unwrap();
2929
2930        let a = ListArray::from(a_list_data);
2931
2932        assert!(a.is_valid(0));
2933        assert!(!a.is_valid(1));
2934        assert!(a.is_valid(2));
2935
2936        assert_eq!(a.value(0).len(), 0);
2937        assert_eq!(a.value(2).len(), 2);
2938        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2939
2940        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2941        roundtrip(batch, None);
2942    }
2943
2944    #[test]
2945    fn list_single_column() {
2946        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2947        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2948        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2949            DataType::Int32,
2950            false,
2951        ))))
2952        .len(5)
2953        .add_buffer(a_value_offsets)
2954        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2955        .add_child_data(a_values.into_data())
2956        .build()
2957        .unwrap();
2958
2959        assert_eq!(a_list_data.null_count(), 1);
2960
2961        let a = ListArray::from(a_list_data);
2962        let values = Arc::new(a);
2963
2964        one_column_roundtrip(values, true);
2965    }
2966
2967    #[test]
2968    fn large_list_single_column() {
2969        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2970        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2971        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2972            "large_item",
2973            DataType::Int32,
2974            true,
2975        ))))
2976        .len(5)
2977        .add_buffer(a_value_offsets)
2978        .add_child_data(a_values.into_data())
2979        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2980        .build()
2981        .unwrap();
2982
2983        // I think this setup is incorrect because this should pass
2984        assert_eq!(a_list_data.null_count(), 1);
2985
2986        let a = LargeListArray::from(a_list_data);
2987        let values = Arc::new(a);
2988
2989        one_column_roundtrip(values, true);
2990    }
2991
2992    #[test]
2993    fn list_nested_nulls() {
2994        use arrow::datatypes::Int32Type;
2995        let data = vec![
2996            Some(vec![Some(1)]),
2997            Some(vec![Some(2), Some(3)]),
2998            None,
2999            Some(vec![Some(4), Some(5), None]),
3000            Some(vec![None]),
3001            Some(vec![Some(6), Some(7)]),
3002        ];
3003
3004        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3005        one_column_roundtrip(Arc::new(list), true);
3006
3007        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3008        one_column_roundtrip(Arc::new(list), true);
3009    }
3010
3011    #[test]
3012    fn struct_single_column() {
3013        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3014        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3015        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3016
3017        let values = Arc::new(s);
3018        one_column_roundtrip(values, false);
3019    }
3020
3021    #[test]
3022    fn list_and_map_coerced_names() {
3023        // Create map and list with non-Parquet naming
3024        let list_field =
3025            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3026        let map_field = Field::new_map(
3027            "my_map",
3028            "entries",
3029            Field::new("keys", DataType::Int32, false),
3030            Field::new("values", DataType::Int32, true),
3031            false,
3032            true,
3033        );
3034
3035        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3036        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3037
3038        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3039
3040        // Write data to Parquet but coerce names to match spec
3041        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3042        let file = tempfile::tempfile().unwrap();
3043        let mut writer =
3044            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3045
3046        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3047        writer.write(&batch).unwrap();
3048        let file_metadata = writer.close().unwrap();
3049
3050        let schema = file_metadata.file_metadata().schema();
3051        // Coerced name of "item" should be "element"
3052        let list_field = &schema.get_fields()[0].get_fields()[0];
3053        assert_eq!(list_field.get_fields()[0].name(), "element");
3054
3055        let map_field = &schema.get_fields()[1].get_fields()[0];
3056        // Coerced name of "entries" should be "key_value"
3057        assert_eq!(map_field.name(), "key_value");
3058        // Coerced name of "keys" should be "key"
3059        assert_eq!(map_field.get_fields()[0].name(), "key");
3060        // Coerced name of "values" should be "value"
3061        assert_eq!(map_field.get_fields()[1].name(), "value");
3062
3063        // Double check schema after reading from the file
3064        let reader = SerializedFileReader::new(file).unwrap();
3065        let file_schema = reader.metadata().file_metadata().schema();
3066        let fields = file_schema.get_fields();
3067        let list_field = &fields[0].get_fields()[0];
3068        assert_eq!(list_field.get_fields()[0].name(), "element");
3069        let map_field = &fields[1].get_fields()[0];
3070        assert_eq!(map_field.name(), "key_value");
3071        assert_eq!(map_field.get_fields()[0].name(), "key");
3072        assert_eq!(map_field.get_fields()[1].name(), "value");
3073    }
3074
3075    #[test]
3076    fn fallback_flush_data_page() {
3077        //tests if the Fallback::flush_data_page clears all buffers correctly
3078        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3079        let values = Arc::new(StringArray::from(raw_values));
3080        let encodings = vec![
3081            Encoding::DELTA_BYTE_ARRAY,
3082            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3083        ];
3084        let data_type = values.data_type().clone();
3085        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3086        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3087
3088        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3089        let data_page_size_limit: usize = 32;
3090        let write_batch_size: usize = 16;
3091
3092        for encoding in &encodings {
3093            for row_group_size in row_group_sizes {
3094                let props = WriterProperties::builder()
3095                    .set_writer_version(WriterVersion::PARQUET_2_0)
3096                    .set_max_row_group_size(row_group_size)
3097                    .set_dictionary_enabled(false)
3098                    .set_encoding(*encoding)
3099                    .set_data_page_size_limit(data_page_size_limit)
3100                    .set_write_batch_size(write_batch_size)
3101                    .build();
3102
3103                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3104                    let string_array_a = StringArray::from(a.clone());
3105                    let string_array_b = StringArray::from(b.clone());
3106                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3107                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3108                    assert_eq!(
3109                        vec_a, vec_b,
3110                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3111                    );
3112                });
3113            }
3114        }
3115    }
3116
3117    #[test]
3118    fn arrow_writer_string_dictionary() {
3119        // define schema
3120        #[allow(deprecated)]
3121        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3122            "dictionary",
3123            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3124            true,
3125            42,
3126            true,
3127        )]));
3128
3129        // create some data
3130        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3131            .iter()
3132            .copied()
3133            .collect();
3134
3135        // build a record batch
3136        one_column_roundtrip_with_schema(Arc::new(d), schema);
3137    }
3138
3139    #[test]
3140    fn arrow_writer_test_type_compatibility() {
3141        fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3142        where
3143            T1: Array + 'static,
3144            T2: Array + 'static,
3145        {
3146            let schema1 = Arc::new(Schema::new(vec![Field::new(
3147                "a",
3148                array1.data_type().clone(),
3149                false,
3150            )]));
3151
3152            let file = tempfile().unwrap();
3153            let mut writer =
3154                ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3155
3156            let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3157            writer.write(&rb1).unwrap();
3158
3159            let schema2 = Arc::new(Schema::new(vec![Field::new(
3160                "a",
3161                array2.data_type().clone(),
3162                false,
3163            )]));
3164            let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3165            writer.write(&rb2).unwrap();
3166
3167            writer.close().unwrap();
3168
3169            let mut record_batch_reader =
3170                ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3171            let actual_batch = record_batch_reader.next().unwrap().unwrap();
3172
3173            let expected_batch =
3174                RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3175            assert_eq!(actual_batch, expected_batch);
3176        }
3177
3178        // check compatibility between native and dictionaries
3179
3180        ensure_compatible_write(
3181            DictionaryArray::new(
3182                UInt8Array::from_iter_values(vec![0]),
3183                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3184            ),
3185            StringArray::from_iter_values(vec!["barquet"]),
3186            DictionaryArray::new(
3187                UInt8Array::from_iter_values(vec![0, 1]),
3188                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3189            ),
3190        );
3191
3192        ensure_compatible_write(
3193            StringArray::from_iter_values(vec!["parquet"]),
3194            DictionaryArray::new(
3195                UInt8Array::from_iter_values(vec![0]),
3196                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3197            ),
3198            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3199        );
3200
3201        // check compatibility between dictionaries with different key types
3202
3203        ensure_compatible_write(
3204            DictionaryArray::new(
3205                UInt8Array::from_iter_values(vec![0]),
3206                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3207            ),
3208            DictionaryArray::new(
3209                UInt16Array::from_iter_values(vec![0]),
3210                Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3211            ),
3212            DictionaryArray::new(
3213                UInt8Array::from_iter_values(vec![0, 1]),
3214                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3215            ),
3216        );
3217
3218        // check compatibility between dictionaries with different value types
3219        ensure_compatible_write(
3220            DictionaryArray::new(
3221                UInt8Array::from_iter_values(vec![0]),
3222                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3223            ),
3224            DictionaryArray::new(
3225                UInt8Array::from_iter_values(vec![0]),
3226                Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3227            ),
3228            DictionaryArray::new(
3229                UInt8Array::from_iter_values(vec![0, 1]),
3230                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3231            ),
3232        );
3233
3234        // check compatibility between a dictionary and a native array with a different type
3235        ensure_compatible_write(
3236            DictionaryArray::new(
3237                UInt8Array::from_iter_values(vec![0]),
3238                Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3239            ),
3240            LargeStringArray::from_iter_values(vec!["barquet"]),
3241            DictionaryArray::new(
3242                UInt8Array::from_iter_values(vec![0, 1]),
3243                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3244            ),
3245        );
3246
3247        // check compatibility for string types
3248
3249        ensure_compatible_write(
3250            StringArray::from_iter_values(vec!["parquet"]),
3251            LargeStringArray::from_iter_values(vec!["barquet"]),
3252            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3253        );
3254
3255        ensure_compatible_write(
3256            LargeStringArray::from_iter_values(vec!["parquet"]),
3257            StringArray::from_iter_values(vec!["barquet"]),
3258            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3259        );
3260
3261        ensure_compatible_write(
3262            StringArray::from_iter_values(vec!["parquet"]),
3263            StringViewArray::from_iter_values(vec!["barquet"]),
3264            StringArray::from_iter_values(vec!["parquet", "barquet"]),
3265        );
3266
3267        ensure_compatible_write(
3268            StringViewArray::from_iter_values(vec!["parquet"]),
3269            StringArray::from_iter_values(vec!["barquet"]),
3270            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3271        );
3272
3273        ensure_compatible_write(
3274            LargeStringArray::from_iter_values(vec!["parquet"]),
3275            StringViewArray::from_iter_values(vec!["barquet"]),
3276            LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3277        );
3278
3279        ensure_compatible_write(
3280            StringViewArray::from_iter_values(vec!["parquet"]),
3281            LargeStringArray::from_iter_values(vec!["barquet"]),
3282            StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3283        );
3284
3285        // check compatibility for binary types
3286
3287        ensure_compatible_write(
3288            BinaryArray::from_iter_values(vec![b"parquet"]),
3289            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3290            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3291        );
3292
3293        ensure_compatible_write(
3294            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3295            BinaryArray::from_iter_values(vec![b"barquet"]),
3296            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3297        );
3298
3299        ensure_compatible_write(
3300            BinaryArray::from_iter_values(vec![b"parquet"]),
3301            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3302            BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3303        );
3304
3305        ensure_compatible_write(
3306            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3307            BinaryArray::from_iter_values(vec![b"barquet"]),
3308            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3309        );
3310
3311        ensure_compatible_write(
3312            BinaryViewArray::from_iter_values(vec![b"parquet"]),
3313            LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3314            BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3315        );
3316
3317        ensure_compatible_write(
3318            LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3319            BinaryViewArray::from_iter_values(vec![b"barquet"]),
3320            LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3321        );
3322    }
3323
3324    #[test]
3325    fn arrow_writer_primitive_dictionary() {
3326        // define schema
3327        #[allow(deprecated)]
3328        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3329            "dictionary",
3330            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3331            true,
3332            42,
3333            true,
3334        )]));
3335
3336        // create some data
3337        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3338        builder.append(12345678).unwrap();
3339        builder.append_null();
3340        builder.append(22345678).unwrap();
3341        builder.append(12345678).unwrap();
3342        let d = builder.finish();
3343
3344        one_column_roundtrip_with_schema(Arc::new(d), schema);
3345    }
3346
3347    #[test]
3348    fn arrow_writer_decimal32_dictionary() {
3349        let integers = vec![12345, 56789, 34567];
3350
3351        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3352
3353        let values = Decimal32Array::from(integers.clone())
3354            .with_precision_and_scale(5, 2)
3355            .unwrap();
3356
3357        let array = DictionaryArray::new(keys, Arc::new(values));
3358        one_column_roundtrip(Arc::new(array.clone()), true);
3359
3360        let values = Decimal32Array::from(integers)
3361            .with_precision_and_scale(9, 2)
3362            .unwrap();
3363
3364        let array = array.with_values(Arc::new(values));
3365        one_column_roundtrip(Arc::new(array), true);
3366    }
3367
3368    #[test]
3369    fn arrow_writer_decimal64_dictionary() {
3370        let integers = vec![12345, 56789, 34567];
3371
3372        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3373
3374        let values = Decimal64Array::from(integers.clone())
3375            .with_precision_and_scale(5, 2)
3376            .unwrap();
3377
3378        let array = DictionaryArray::new(keys, Arc::new(values));
3379        one_column_roundtrip(Arc::new(array.clone()), true);
3380
3381        let values = Decimal64Array::from(integers)
3382            .with_precision_and_scale(12, 2)
3383            .unwrap();
3384
3385        let array = array.with_values(Arc::new(values));
3386        one_column_roundtrip(Arc::new(array), true);
3387    }
3388
3389    #[test]
3390    fn arrow_writer_decimal128_dictionary() {
3391        let integers = vec![12345, 56789, 34567];
3392
3393        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3394
3395        let values = Decimal128Array::from(integers.clone())
3396            .with_precision_and_scale(5, 2)
3397            .unwrap();
3398
3399        let array = DictionaryArray::new(keys, Arc::new(values));
3400        one_column_roundtrip(Arc::new(array.clone()), true);
3401
3402        let values = Decimal128Array::from(integers)
3403            .with_precision_and_scale(12, 2)
3404            .unwrap();
3405
3406        let array = array.with_values(Arc::new(values));
3407        one_column_roundtrip(Arc::new(array), true);
3408    }
3409
3410    #[test]
3411    fn arrow_writer_decimal256_dictionary() {
3412        let integers = vec![
3413            i256::from_i128(12345),
3414            i256::from_i128(56789),
3415            i256::from_i128(34567),
3416        ];
3417
3418        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3419
3420        let values = Decimal256Array::from(integers.clone())
3421            .with_precision_and_scale(5, 2)
3422            .unwrap();
3423
3424        let array = DictionaryArray::new(keys, Arc::new(values));
3425        one_column_roundtrip(Arc::new(array.clone()), true);
3426
3427        let values = Decimal256Array::from(integers)
3428            .with_precision_and_scale(12, 2)
3429            .unwrap();
3430
3431        let array = array.with_values(Arc::new(values));
3432        one_column_roundtrip(Arc::new(array), true);
3433    }
3434
3435    #[test]
3436    fn arrow_writer_string_dictionary_unsigned_index() {
3437        // define schema
3438        #[allow(deprecated)]
3439        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3440            "dictionary",
3441            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3442            true,
3443            42,
3444            true,
3445        )]));
3446
3447        // create some data
3448        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3449            .iter()
3450            .copied()
3451            .collect();
3452
3453        one_column_roundtrip_with_schema(Arc::new(d), schema);
3454    }
3455
3456    #[test]
3457    fn u32_min_max() {
3458        // check values roundtrip through parquet
3459        let src = [
3460            u32::MIN,
3461            u32::MIN + 1,
3462            (i32::MAX as u32) - 1,
3463            i32::MAX as u32,
3464            (i32::MAX as u32) + 1,
3465            u32::MAX - 1,
3466            u32::MAX,
3467        ];
3468        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3469        let files = one_column_roundtrip(values, false);
3470
3471        for file in files {
3472            // check statistics are valid
3473            let reader = SerializedFileReader::new(file).unwrap();
3474            let metadata = reader.metadata();
3475
3476            let mut row_offset = 0;
3477            for row_group in metadata.row_groups() {
3478                assert_eq!(row_group.num_columns(), 1);
3479                let column = row_group.column(0);
3480
3481                let num_values = column.num_values() as usize;
3482                let src_slice = &src[row_offset..row_offset + num_values];
3483                row_offset += column.num_values() as usize;
3484
3485                let stats = column.statistics().unwrap();
3486                if let Statistics::Int32(stats) = stats {
3487                    assert_eq!(
3488                        *stats.min_opt().unwrap() as u32,
3489                        *src_slice.iter().min().unwrap()
3490                    );
3491                    assert_eq!(
3492                        *stats.max_opt().unwrap() as u32,
3493                        *src_slice.iter().max().unwrap()
3494                    );
3495                } else {
3496                    panic!("Statistics::Int32 missing")
3497                }
3498            }
3499        }
3500    }
3501
3502    #[test]
3503    fn u64_min_max() {
3504        // check values roundtrip through parquet
3505        let src = [
3506            u64::MIN,
3507            u64::MIN + 1,
3508            (i64::MAX as u64) - 1,
3509            i64::MAX as u64,
3510            (i64::MAX as u64) + 1,
3511            u64::MAX - 1,
3512            u64::MAX,
3513        ];
3514        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3515        let files = one_column_roundtrip(values, false);
3516
3517        for file in files {
3518            // check statistics are valid
3519            let reader = SerializedFileReader::new(file).unwrap();
3520            let metadata = reader.metadata();
3521
3522            let mut row_offset = 0;
3523            for row_group in metadata.row_groups() {
3524                assert_eq!(row_group.num_columns(), 1);
3525                let column = row_group.column(0);
3526
3527                let num_values = column.num_values() as usize;
3528                let src_slice = &src[row_offset..row_offset + num_values];
3529                row_offset += column.num_values() as usize;
3530
3531                let stats = column.statistics().unwrap();
3532                if let Statistics::Int64(stats) = stats {
3533                    assert_eq!(
3534                        *stats.min_opt().unwrap() as u64,
3535                        *src_slice.iter().min().unwrap()
3536                    );
3537                    assert_eq!(
3538                        *stats.max_opt().unwrap() as u64,
3539                        *src_slice.iter().max().unwrap()
3540                    );
3541                } else {
3542                    panic!("Statistics::Int64 missing")
3543                }
3544            }
3545        }
3546    }
3547
3548    #[test]
3549    fn statistics_null_counts_only_nulls() {
3550        // check that null-count statistics for "only NULL"-columns are correct
3551        let values = Arc::new(UInt64Array::from(vec![None, None]));
3552        let files = one_column_roundtrip(values, true);
3553
3554        for file in files {
3555            // check statistics are valid
3556            let reader = SerializedFileReader::new(file).unwrap();
3557            let metadata = reader.metadata();
3558            assert_eq!(metadata.num_row_groups(), 1);
3559            let row_group = metadata.row_group(0);
3560            assert_eq!(row_group.num_columns(), 1);
3561            let column = row_group.column(0);
3562            let stats = column.statistics().unwrap();
3563            assert_eq!(stats.null_count_opt(), Some(2));
3564        }
3565    }
3566
3567    #[test]
3568    fn test_list_of_struct_roundtrip() {
3569        // define schema
3570        let int_field = Field::new("a", DataType::Int32, true);
3571        let int_field2 = Field::new("b", DataType::Int32, true);
3572
3573        let int_builder = Int32Builder::with_capacity(10);
3574        let int_builder2 = Int32Builder::with_capacity(10);
3575
3576        let struct_builder = StructBuilder::new(
3577            vec![int_field, int_field2],
3578            vec![Box::new(int_builder), Box::new(int_builder2)],
3579        );
3580        let mut list_builder = ListBuilder::new(struct_builder);
3581
3582        // Construct the following array
3583        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3584
3585        // [{a: 1, b: 2}]
3586        let values = list_builder.values();
3587        values
3588            .field_builder::<Int32Builder>(0)
3589            .unwrap()
3590            .append_value(1);
3591        values
3592            .field_builder::<Int32Builder>(1)
3593            .unwrap()
3594            .append_value(2);
3595        values.append(true);
3596        list_builder.append(true);
3597
3598        // []
3599        list_builder.append(true);
3600
3601        // null
3602        list_builder.append(false);
3603
3604        // [null, null]
3605        let values = list_builder.values();
3606        values
3607            .field_builder::<Int32Builder>(0)
3608            .unwrap()
3609            .append_null();
3610        values
3611            .field_builder::<Int32Builder>(1)
3612            .unwrap()
3613            .append_null();
3614        values.append(false);
3615        values
3616            .field_builder::<Int32Builder>(0)
3617            .unwrap()
3618            .append_null();
3619        values
3620            .field_builder::<Int32Builder>(1)
3621            .unwrap()
3622            .append_null();
3623        values.append(false);
3624        list_builder.append(true);
3625
3626        // [{a: null, b: 3}]
3627        let values = list_builder.values();
3628        values
3629            .field_builder::<Int32Builder>(0)
3630            .unwrap()
3631            .append_null();
3632        values
3633            .field_builder::<Int32Builder>(1)
3634            .unwrap()
3635            .append_value(3);
3636        values.append(true);
3637        list_builder.append(true);
3638
3639        // [{a: 2, b: null}]
3640        let values = list_builder.values();
3641        values
3642            .field_builder::<Int32Builder>(0)
3643            .unwrap()
3644            .append_value(2);
3645        values
3646            .field_builder::<Int32Builder>(1)
3647            .unwrap()
3648            .append_null();
3649        values.append(true);
3650        list_builder.append(true);
3651
3652        let array = Arc::new(list_builder.finish());
3653
3654        one_column_roundtrip(array, true);
3655    }
3656
3657    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3658        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3659    }
3660
3661    #[test]
3662    fn test_aggregates_records() {
3663        let arrays = [
3664            Int32Array::from((0..100).collect::<Vec<_>>()),
3665            Int32Array::from((0..50).collect::<Vec<_>>()),
3666            Int32Array::from((200..500).collect::<Vec<_>>()),
3667        ];
3668
3669        let schema = Arc::new(Schema::new(vec![Field::new(
3670            "int",
3671            ArrowDataType::Int32,
3672            false,
3673        )]));
3674
3675        let file = tempfile::tempfile().unwrap();
3676
3677        let props = WriterProperties::builder()
3678            .set_max_row_group_size(200)
3679            .build();
3680
3681        let mut writer =
3682            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3683
3684        for array in arrays {
3685            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3686            writer.write(&batch).unwrap();
3687        }
3688
3689        writer.close().unwrap();
3690
3691        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3692        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3693
3694        let batches = builder
3695            .with_batch_size(100)
3696            .build()
3697            .unwrap()
3698            .collect::<ArrowResult<Vec<_>>>()
3699            .unwrap();
3700
3701        assert_eq!(batches.len(), 5);
3702        assert!(batches.iter().all(|x| x.num_columns() == 1));
3703
3704        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3705
3706        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3707
3708        let values: Vec<_> = batches
3709            .iter()
3710            .flat_map(|x| {
3711                x.column(0)
3712                    .as_any()
3713                    .downcast_ref::<Int32Array>()
3714                    .unwrap()
3715                    .values()
3716                    .iter()
3717                    .cloned()
3718            })
3719            .collect();
3720
3721        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3722        assert_eq!(&values, &expected_values)
3723    }
3724
3725    #[test]
3726    fn complex_aggregate() {
3727        // Tests aggregating nested data
3728        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3729        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3730        let struct_a = Arc::new(Field::new(
3731            "struct_a",
3732            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3733            true,
3734        ));
3735
3736        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3737        let struct_b = Arc::new(Field::new(
3738            "struct_b",
3739            DataType::Struct(vec![list_a.clone()].into()),
3740            false,
3741        ));
3742
3743        let schema = Arc::new(Schema::new(vec![struct_b]));
3744
3745        // create nested data
3746        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3747        let field_b_array =
3748            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3749
3750        let struct_a_array = StructArray::from(vec![
3751            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3752            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3753        ]);
3754
3755        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3756            .len(5)
3757            .add_buffer(Buffer::from_iter(vec![
3758                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3759            ]))
3760            .null_bit_buffer(Some(Buffer::from_iter(vec![
3761                true, false, true, false, true,
3762            ])))
3763            .child_data(vec![struct_a_array.into_data()])
3764            .build()
3765            .unwrap();
3766
3767        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3768        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3769
3770        let batch1 =
3771            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3772                .unwrap();
3773
3774        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3775        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3776
3777        let struct_a_array = StructArray::from(vec![
3778            (field_a, Arc::new(field_a_array) as ArrayRef),
3779            (field_b, Arc::new(field_b_array) as ArrayRef),
3780        ]);
3781
3782        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3783            .len(2)
3784            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3785            .child_data(vec![struct_a_array.into_data()])
3786            .build()
3787            .unwrap();
3788
3789        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3790        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3791
3792        let batch2 =
3793            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3794                .unwrap();
3795
3796        let batches = &[batch1, batch2];
3797
3798        // Verify data is as expected
3799
3800        let expected = r#"
3801            +-------------------------------------------------------------------------------------------------------+
3802            | struct_b                                                                                              |
3803            +-------------------------------------------------------------------------------------------------------+
3804            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3805            | {list: }                                                                                              |
3806            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3807            | {list: }                                                                                              |
3808            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3809            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3810            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3811            +-------------------------------------------------------------------------------------------------------+
3812        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3813
3814        let actual = pretty_format_batches(batches).unwrap().to_string();
3815        assert_eq!(actual, expected);
3816
3817        // Write data
3818        let file = tempfile::tempfile().unwrap();
3819        let props = WriterProperties::builder()
3820            .set_max_row_group_size(6)
3821            .build();
3822
3823        let mut writer =
3824            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3825
3826        for batch in batches {
3827            writer.write(batch).unwrap();
3828        }
3829        writer.close().unwrap();
3830
3831        // Read Data
3832        // Should have written entire first batch and first row of second to the first row group
3833        // leaving a single row in the second row group
3834
3835        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3836        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3837
3838        let batches = builder
3839            .with_batch_size(2)
3840            .build()
3841            .unwrap()
3842            .collect::<ArrowResult<Vec<_>>>()
3843            .unwrap();
3844
3845        assert_eq!(batches.len(), 4);
3846        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3847        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3848
3849        let actual = pretty_format_batches(&batches).unwrap().to_string();
3850        assert_eq!(actual, expected);
3851    }
3852
3853    #[test]
3854    fn test_arrow_writer_metadata() {
3855        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3856        let file_schema = batch_schema.clone().with_metadata(
3857            vec![("foo".to_string(), "bar".to_string())]
3858                .into_iter()
3859                .collect(),
3860        );
3861
3862        let batch = RecordBatch::try_new(
3863            Arc::new(batch_schema),
3864            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3865        )
3866        .unwrap();
3867
3868        let mut buf = Vec::with_capacity(1024);
3869        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3870        writer.write(&batch).unwrap();
3871        writer.close().unwrap();
3872    }
3873
3874    #[test]
3875    fn test_arrow_writer_nullable() {
3876        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3877        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3878        let file_schema = Arc::new(file_schema);
3879
3880        let batch = RecordBatch::try_new(
3881            Arc::new(batch_schema),
3882            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3883        )
3884        .unwrap();
3885
3886        let mut buf = Vec::with_capacity(1024);
3887        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3888        writer.write(&batch).unwrap();
3889        writer.close().unwrap();
3890
3891        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3892        let back = read.next().unwrap().unwrap();
3893        assert_eq!(back.schema(), file_schema);
3894        assert_ne!(back.schema(), batch.schema());
3895        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3896    }
3897
3898    #[test]
3899    fn in_progress_accounting() {
3900        // define schema
3901        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3902
3903        // create some data
3904        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3905
3906        // build a record batch
3907        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3908
3909        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3910
3911        // starts empty
3912        assert_eq!(writer.in_progress_size(), 0);
3913        assert_eq!(writer.in_progress_rows(), 0);
3914        assert_eq!(writer.memory_size(), 0);
3915        assert_eq!(writer.bytes_written(), 4); // Initial header
3916        writer.write(&batch).unwrap();
3917
3918        // updated on write
3919        let initial_size = writer.in_progress_size();
3920        assert!(initial_size > 0);
3921        assert_eq!(writer.in_progress_rows(), 5);
3922        let initial_memory = writer.memory_size();
3923        assert!(initial_memory > 0);
3924        // memory estimate is larger than estimated encoded size
3925        assert!(
3926            initial_size <= initial_memory,
3927            "{initial_size} <= {initial_memory}"
3928        );
3929
3930        // updated on second write
3931        writer.write(&batch).unwrap();
3932        assert!(writer.in_progress_size() > initial_size);
3933        assert_eq!(writer.in_progress_rows(), 10);
3934        assert!(writer.memory_size() > initial_memory);
3935        assert!(
3936            writer.in_progress_size() <= writer.memory_size(),
3937            "in_progress_size {} <= memory_size {}",
3938            writer.in_progress_size(),
3939            writer.memory_size()
3940        );
3941
3942        // in progress tracking is cleared, but the overall data written is updated
3943        let pre_flush_bytes_written = writer.bytes_written();
3944        writer.flush().unwrap();
3945        assert_eq!(writer.in_progress_size(), 0);
3946        assert_eq!(writer.memory_size(), 0);
3947        assert!(writer.bytes_written() > pre_flush_bytes_written);
3948
3949        writer.close().unwrap();
3950    }
3951
3952    #[test]
3953    fn test_writer_all_null() {
3954        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3955        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3956        let batch = RecordBatch::try_from_iter(vec![
3957            ("a", Arc::new(a) as ArrayRef),
3958            ("b", Arc::new(b) as ArrayRef),
3959        ])
3960        .unwrap();
3961
3962        let mut buf = Vec::with_capacity(1024);
3963        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3964        writer.write(&batch).unwrap();
3965        writer.close().unwrap();
3966
3967        let bytes = Bytes::from(buf);
3968        let options = ReadOptionsBuilder::new().with_page_index().build();
3969        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3970        let index = reader.metadata().offset_index().unwrap();
3971
3972        assert_eq!(index.len(), 1);
3973        assert_eq!(index[0].len(), 2); // 2 columns
3974        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3975        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3976    }
3977
3978    #[test]
3979    fn test_disabled_statistics_with_page() {
3980        let file_schema = Schema::new(vec![
3981            Field::new("a", DataType::Utf8, true),
3982            Field::new("b", DataType::Utf8, true),
3983        ]);
3984        let file_schema = Arc::new(file_schema);
3985
3986        let batch = RecordBatch::try_new(
3987            file_schema.clone(),
3988            vec![
3989                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3990                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3991            ],
3992        )
3993        .unwrap();
3994
3995        let props = WriterProperties::builder()
3996            .set_statistics_enabled(EnabledStatistics::None)
3997            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3998            .build();
3999
4000        let mut buf = Vec::with_capacity(1024);
4001        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4002        writer.write(&batch).unwrap();
4003
4004        let metadata = writer.close().unwrap();
4005        assert_eq!(metadata.num_row_groups(), 1);
4006        let row_group = metadata.row_group(0);
4007        assert_eq!(row_group.num_columns(), 2);
4008        // Column "a" has both offset and column index, as requested
4009        assert!(row_group.column(0).offset_index_offset().is_some());
4010        assert!(row_group.column(0).column_index_offset().is_some());
4011        // Column "b" should only have offset index
4012        assert!(row_group.column(1).offset_index_offset().is_some());
4013        assert!(row_group.column(1).column_index_offset().is_none());
4014
4015        let options = ReadOptionsBuilder::new().with_page_index().build();
4016        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4017
4018        let row_group = reader.get_row_group(0).unwrap();
4019        let a_col = row_group.metadata().column(0);
4020        let b_col = row_group.metadata().column(1);
4021
4022        // Column chunk of column "a" should have chunk level statistics
4023        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4024            let min = byte_array_stats.min_opt().unwrap();
4025            let max = byte_array_stats.max_opt().unwrap();
4026
4027            assert_eq!(min.as_bytes(), b"a");
4028            assert_eq!(max.as_bytes(), b"d");
4029        } else {
4030            panic!("expecting Statistics::ByteArray");
4031        }
4032
4033        // The column chunk for column "b" shouldn't have statistics
4034        assert!(b_col.statistics().is_none());
4035
4036        let offset_index = reader.metadata().offset_index().unwrap();
4037        assert_eq!(offset_index.len(), 1); // 1 row group
4038        assert_eq!(offset_index[0].len(), 2); // 2 columns
4039
4040        let column_index = reader.metadata().column_index().unwrap();
4041        assert_eq!(column_index.len(), 1); // 1 row group
4042        assert_eq!(column_index[0].len(), 2); // 2 columns
4043
4044        let a_idx = &column_index[0][0];
4045        assert!(
4046            matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4047            "{a_idx:?}"
4048        );
4049        let b_idx = &column_index[0][1];
4050        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4051    }
4052
4053    #[test]
4054    fn test_disabled_statistics_with_chunk() {
4055        let file_schema = Schema::new(vec![
4056            Field::new("a", DataType::Utf8, true),
4057            Field::new("b", DataType::Utf8, true),
4058        ]);
4059        let file_schema = Arc::new(file_schema);
4060
4061        let batch = RecordBatch::try_new(
4062            file_schema.clone(),
4063            vec![
4064                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4065                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4066            ],
4067        )
4068        .unwrap();
4069
4070        let props = WriterProperties::builder()
4071            .set_statistics_enabled(EnabledStatistics::None)
4072            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4073            .build();
4074
4075        let mut buf = Vec::with_capacity(1024);
4076        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4077        writer.write(&batch).unwrap();
4078
4079        let metadata = writer.close().unwrap();
4080        assert_eq!(metadata.num_row_groups(), 1);
4081        let row_group = metadata.row_group(0);
4082        assert_eq!(row_group.num_columns(), 2);
4083        // Column "a" should only have offset index
4084        assert!(row_group.column(0).offset_index_offset().is_some());
4085        assert!(row_group.column(0).column_index_offset().is_none());
4086        // Column "b" should only have offset index
4087        assert!(row_group.column(1).offset_index_offset().is_some());
4088        assert!(row_group.column(1).column_index_offset().is_none());
4089
4090        let options = ReadOptionsBuilder::new().with_page_index().build();
4091        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4092
4093        let row_group = reader.get_row_group(0).unwrap();
4094        let a_col = row_group.metadata().column(0);
4095        let b_col = row_group.metadata().column(1);
4096
4097        // Column chunk of column "a" should have chunk level statistics
4098        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4099            let min = byte_array_stats.min_opt().unwrap();
4100            let max = byte_array_stats.max_opt().unwrap();
4101
4102            assert_eq!(min.as_bytes(), b"a");
4103            assert_eq!(max.as_bytes(), b"d");
4104        } else {
4105            panic!("expecting Statistics::ByteArray");
4106        }
4107
4108        // The column chunk for column "b"  shouldn't have statistics
4109        assert!(b_col.statistics().is_none());
4110
4111        let column_index = reader.metadata().column_index().unwrap();
4112        assert_eq!(column_index.len(), 1); // 1 row group
4113        assert_eq!(column_index[0].len(), 2); // 2 columns
4114
4115        let a_idx = &column_index[0][0];
4116        assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4117        let b_idx = &column_index[0][1];
4118        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4119    }
4120
4121    #[test]
4122    fn test_arrow_writer_skip_metadata() {
4123        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4124        let file_schema = Arc::new(batch_schema.clone());
4125
4126        let batch = RecordBatch::try_new(
4127            Arc::new(batch_schema),
4128            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4129        )
4130        .unwrap();
4131        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4132
4133        let mut buf = Vec::with_capacity(1024);
4134        let mut writer =
4135            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4136        writer.write(&batch).unwrap();
4137        writer.close().unwrap();
4138
4139        let bytes = Bytes::from(buf);
4140        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4141        assert_eq!(file_schema, *reader_builder.schema());
4142        if let Some(key_value_metadata) = reader_builder
4143            .metadata()
4144            .file_metadata()
4145            .key_value_metadata()
4146        {
4147            assert!(
4148                !key_value_metadata
4149                    .iter()
4150                    .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4151            );
4152        }
4153    }
4154
4155    #[test]
4156    fn test_arrow_writer_explicit_schema() {
4157        // Write an int32 array using explicit int64 storage
4158        let batch_schema = Arc::new(Schema::new(vec![Field::new(
4159            "integers",
4160            DataType::Int32,
4161            true,
4162        )]));
4163        let parquet_schema = Type::group_type_builder("root")
4164            .with_fields(vec![Type::primitive_type_builder(
4165                "integers",
4166                crate::basic::Type::INT64,
4167            )
4168            .build()
4169            .unwrap()
4170            .into()])
4171            .build()
4172            .unwrap();
4173        let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());
4174
4175        let batch = RecordBatch::try_new(
4176            batch_schema.clone(),
4177            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4178        )
4179        .unwrap();
4180
4181        let explicit_schema_options =
4182            ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
4183        let mut buf = Vec::with_capacity(1024);
4184        let mut writer = ArrowWriter::try_new_with_options(
4185            &mut buf,
4186            batch_schema.clone(),
4187            explicit_schema_options,
4188        )
4189        .unwrap();
4190        writer.write(&batch).unwrap();
4191        writer.close().unwrap();
4192
4193        let bytes = Bytes::from(buf);
4194        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4195
4196        let expected_schema = Arc::new(Schema::new(vec![Field::new(
4197            "integers",
4198            DataType::Int64,
4199            true,
4200        )]));
4201        assert_eq!(reader_builder.schema(), &expected_schema);
4202
4203        let batches = reader_builder
4204            .build()
4205            .unwrap()
4206            .collect::<Result<Vec<_>, ArrowError>>()
4207            .unwrap();
4208        assert_eq!(batches.len(), 1);
4209
4210        let expected_batch = RecordBatch::try_new(
4211            expected_schema.clone(),
4212            vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
4213        )
4214        .unwrap();
4215        assert_eq!(batches[0], expected_batch);
4216    }
4217
4218    #[test]
4219    fn mismatched_schemas() {
4220        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4221        let file_schema = Arc::new(Schema::new(vec![Field::new(
4222            "temperature",
4223            DataType::Float64,
4224            false,
4225        )]));
4226
4227        let batch = RecordBatch::try_new(
4228            Arc::new(batch_schema),
4229            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4230        )
4231        .unwrap();
4232
4233        let mut buf = Vec::with_capacity(1024);
4234        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4235
4236        let err = writer.write(&batch).unwrap_err().to_string();
4237        assert_eq!(
4238            err,
4239            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4240        );
4241    }
4242
4243    #[test]
4244    // https://github.com/apache/arrow-rs/issues/6988
4245    fn test_roundtrip_empty_schema() {
4246        // create empty record batch with empty schema
4247        let empty_batch = RecordBatch::try_new_with_options(
4248            Arc::new(Schema::empty()),
4249            vec![],
4250            &RecordBatchOptions::default().with_row_count(Some(0)),
4251        )
4252        .unwrap();
4253
4254        // write to parquet
4255        let mut parquet_bytes: Vec<u8> = Vec::new();
4256        let mut writer =
4257            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4258        writer.write(&empty_batch).unwrap();
4259        writer.close().unwrap();
4260
4261        // read from parquet
4262        let bytes = Bytes::from(parquet_bytes);
4263        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4264        assert_eq!(reader.schema(), &empty_batch.schema());
4265        let batches: Vec<_> = reader
4266            .build()
4267            .unwrap()
4268            .collect::<ArrowResult<Vec<_>>>()
4269            .unwrap();
4270        assert_eq!(batches.len(), 0);
4271    }
4272
4273    #[test]
4274    fn test_page_stats_not_written_by_default() {
4275        let string_field = Field::new("a", DataType::Utf8, false);
4276        let schema = Schema::new(vec![string_field]);
4277        let raw_string_values = vec!["Blart Versenwald III"];
4278        let string_values = StringArray::from(raw_string_values.clone());
4279        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4280
4281        let props = WriterProperties::builder()
4282            .set_statistics_enabled(EnabledStatistics::Page)
4283            .set_dictionary_enabled(false)
4284            .set_encoding(Encoding::PLAIN)
4285            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4286            .build();
4287
4288        let file = roundtrip_opts(&batch, props);
4289
4290        // read file and decode page headers
4291        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4292
4293        // decode first page header
4294        let first_page = &file[4..];
4295        let mut prot = ThriftSliceInputProtocol::new(first_page);
4296        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4297        let stats = hdr.data_page_header.unwrap().statistics;
4298
4299        assert!(stats.is_none());
4300    }
4301
4302    #[test]
4303    fn test_page_stats_when_enabled() {
4304        let string_field = Field::new("a", DataType::Utf8, false);
4305        let schema = Schema::new(vec![string_field]);
4306        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4307        let string_values = StringArray::from(raw_string_values.clone());
4308        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4309
4310        let props = WriterProperties::builder()
4311            .set_statistics_enabled(EnabledStatistics::Page)
4312            .set_dictionary_enabled(false)
4313            .set_encoding(Encoding::PLAIN)
4314            .set_write_page_header_statistics(true)
4315            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4316            .build();
4317
4318        let file = roundtrip_opts(&batch, props);
4319
4320        // read file and decode page headers
4321        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4322
4323        // decode first page header
4324        let first_page = &file[4..];
4325        let mut prot = ThriftSliceInputProtocol::new(first_page);
4326        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4327        let stats = hdr.data_page_header.unwrap().statistics;
4328
4329        let stats = stats.unwrap();
4330        // check that min/max were actually written to the page
4331        assert!(stats.is_max_value_exact.unwrap());
4332        assert!(stats.is_min_value_exact.unwrap());
4333        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4334        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4335    }
4336
4337    #[test]
4338    fn test_page_stats_truncation() {
4339        let string_field = Field::new("a", DataType::Utf8, false);
4340        let binary_field = Field::new("b", DataType::Binary, false);
4341        let schema = Schema::new(vec![string_field, binary_field]);
4342
4343        let raw_string_values = vec!["Blart Versenwald III"];
4344        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4345        let raw_binary_value_refs = raw_binary_values
4346            .iter()
4347            .map(|x| x.as_slice())
4348            .collect::<Vec<_>>();
4349
4350        let string_values = StringArray::from(raw_string_values.clone());
4351        let binary_values = BinaryArray::from(raw_binary_value_refs);
4352        let batch = RecordBatch::try_new(
4353            Arc::new(schema),
4354            vec![Arc::new(string_values), Arc::new(binary_values)],
4355        )
4356        .unwrap();
4357
4358        let props = WriterProperties::builder()
4359            .set_statistics_truncate_length(Some(2))
4360            .set_dictionary_enabled(false)
4361            .set_encoding(Encoding::PLAIN)
4362            .set_write_page_header_statistics(true)
4363            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4364            .build();
4365
4366        let file = roundtrip_opts(&batch, props);
4367
4368        // read file and decode page headers
4369        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4370
4371        // decode first page header
4372        let first_page = &file[4..];
4373        let mut prot = ThriftSliceInputProtocol::new(first_page);
4374        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4375        let stats = hdr.data_page_header.unwrap().statistics;
4376        assert!(stats.is_some());
4377        let stats = stats.unwrap();
4378        // check that min/max were properly truncated
4379        assert!(!stats.is_max_value_exact.unwrap());
4380        assert!(!stats.is_min_value_exact.unwrap());
4381        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4382        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4383
4384        // check second page now
4385        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4386        let mut prot = ThriftSliceInputProtocol::new(second_page);
4387        let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4388        let stats = hdr.data_page_header.unwrap().statistics;
4389        assert!(stats.is_some());
4390        let stats = stats.unwrap();
4391        // check that min/max were properly truncated
4392        assert!(!stats.is_max_value_exact.unwrap());
4393        assert!(!stats.is_min_value_exact.unwrap());
4394        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4395        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4396    }
4397
4398    #[test]
4399    fn test_page_encoding_statistics_roundtrip() {
4400        let batch_schema = Schema::new(vec![Field::new(
4401            "int32",
4402            arrow_schema::DataType::Int32,
4403            false,
4404        )]);
4405
4406        let batch = RecordBatch::try_new(
4407            Arc::new(batch_schema.clone()),
4408            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4409        )
4410        .unwrap();
4411
4412        let mut file: File = tempfile::tempfile().unwrap();
4413        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4414        writer.write(&batch).unwrap();
4415        let file_metadata = writer.close().unwrap();
4416
4417        assert_eq!(file_metadata.num_row_groups(), 1);
4418        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4419        assert!(
4420            file_metadata
4421                .row_group(0)
4422                .column(0)
4423                .page_encoding_stats()
4424                .is_some()
4425        );
4426        let chunk_page_stats = file_metadata
4427            .row_group(0)
4428            .column(0)
4429            .page_encoding_stats()
4430            .unwrap();
4431
4432        // check that the read metadata is also correct
4433        let options = ReadOptionsBuilder::new().with_page_index().build();
4434        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4435
4436        let rowgroup = reader.get_row_group(0).expect("row group missing");
4437        assert_eq!(rowgroup.num_columns(), 1);
4438        let column = rowgroup.metadata().column(0);
4439        assert!(column.page_encoding_stats().is_some());
4440        let file_page_stats = column.page_encoding_stats().unwrap();
4441        assert_eq!(chunk_page_stats, file_page_stats);
4442    }
4443
4444    #[test]
4445    fn test_different_dict_page_size_limit() {
4446        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4447        let schema = Arc::new(Schema::new(vec![
4448            Field::new("col0", arrow_schema::DataType::Int64, false),
4449            Field::new("col1", arrow_schema::DataType::Int64, false),
4450        ]));
4451        let batch =
4452            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4453
4454        let props = WriterProperties::builder()
4455            .set_dictionary_page_size_limit(1024 * 1024)
4456            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4457            .build();
4458        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4459        writer.write(&batch).unwrap();
4460        let data = Bytes::from(writer.into_inner().unwrap());
4461
4462        let mut metadata = ParquetMetaDataReader::new();
4463        metadata.try_parse(&data).unwrap();
4464        let metadata = metadata.finish().unwrap();
4465        let col0_meta = metadata.row_group(0).column(0);
4466        let col1_meta = metadata.row_group(0).column(1);
4467
4468        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4469            let mut reader =
4470                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4471            let page = reader.get_next_page().unwrap().unwrap();
4472            match page {
4473                Page::DictionaryPage { buf, .. } => buf.len(),
4474                _ => panic!("expected DictionaryPage"),
4475            }
4476        };
4477
4478        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4479        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4480    }
4481}