parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58/// Encodes [`RecordBatch`] to parquet
59///
60/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
61/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
62/// flushed on close, leading the final row group in the output file to potentially
63/// contain fewer than `max_row_group_size` rows
64///
65/// # Example: Writing `RecordBatch`es
66/// ```
67/// # use std::sync::Arc;
68/// # use bytes::Bytes;
69/// # use arrow_array::{ArrayRef, Int64Array};
70/// # use arrow_array::RecordBatch;
71/// # use parquet::arrow::arrow_writer::ArrowWriter;
72/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
73/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
74/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
75///
76/// let mut buffer = Vec::new();
77/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
78/// writer.write(&to_write).unwrap();
79/// writer.close().unwrap();
80///
81/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
82/// let read = reader.next().unwrap().unwrap();
83///
84/// assert_eq!(to_write, read);
85/// ```
86///
87/// # Memory Usage and Limiting
88///
89/// The nature of Parquet requires buffering of an entire row group before it can
90/// be flushed to the underlying writer. Data is mostly buffered in its encoded
91/// form, reducing memory usage. However, some data such as dictionary keys,
92/// large strings or very nested data may still result in non-trivial memory
93/// usage.
94///
95/// See Also:
96/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
97/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
98///
99/// Call [`Self::flush`] to trigger an early flush of a row group based on a
100/// memory threshold and/or global memory pressure. However,  smaller row groups
101/// result in higher metadata overheads, and thus may worsen compression ratios
102/// and query performance.
103///
104/// ```no_run
105/// # use std::io::Write;
106/// # use arrow_array::RecordBatch;
107/// # use parquet::arrow::ArrowWriter;
108/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
109/// # let batch: RecordBatch = todo!();
110/// writer.write(&batch).unwrap();
111/// // Trigger an early flush if anticipated size exceeds 1_000_000
112/// if writer.in_progress_size() > 1_000_000 {
113///     writer.flush().unwrap();
114/// }
115/// ```
116///
117/// ## Type Support
118///
119/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
120/// Parquet types including  [`StructArray`] and [`ListArray`].
121///
122/// The following are not supported:
123///
124/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
125///
126/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
127/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
128/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
129/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
130/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
131pub struct ArrowWriter<W: Write> {
132    /// Underlying Parquet writer
133    writer: SerializedFileWriter<W>,
134
135    /// The in-progress row group if any
136    in_progress: Option<ArrowRowGroupWriter>,
137
138    /// A copy of the Arrow schema.
139    ///
140    /// The schema is used to verify that each record batch written has the correct schema
141    arrow_schema: SchemaRef,
142
143    /// Creates new [`ArrowRowGroupWriter`] instances as required
144    row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146    /// The length of arrays to write to each row group
147    max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        let buffered_memory = self.in_progress_size();
153        f.debug_struct("ArrowWriter")
154            .field("writer", &self.writer)
155            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156            .field("in_progress_rows", &self.in_progress_rows())
157            .field("arrow_schema", &self.arrow_schema)
158            .field("max_row_group_size", &self.max_row_group_size)
159            .finish()
160    }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164    /// Try to create a new Arrow writer
165    ///
166    /// The writer will fail if:
167    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
168    ///  * the Arrow schema contains unsupported datatypes such as Unions
169    pub fn try_new(
170        writer: W,
171        arrow_schema: SchemaRef,
172        props: Option<WriterProperties>,
173    ) -> Result<Self> {
174        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175        Self::try_new_with_options(writer, arrow_schema, options)
176    }
177
178    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
179    ///
180    /// The writer will fail if:
181    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
182    ///  * the Arrow schema contains unsupported datatypes such as Unions
183    pub fn try_new_with_options(
184        writer: W,
185        arrow_schema: SchemaRef,
186        options: ArrowWriterOptions,
187    ) -> Result<Self> {
188        let mut props = options.properties;
189        let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190        if let Some(schema_root) = &options.schema_root {
191            converter = converter.schema_root(schema_root);
192        }
193        let schema = converter.convert(&arrow_schema)?;
194        if !options.skip_arrow_metadata {
195            // add serialized arrow schema
196            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197        }
198
199        let max_row_group_size = props.max_row_group_size();
200
201        let file_writer =
202            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204        let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206        Ok(Self {
207            writer: file_writer,
208            in_progress: None,
209            arrow_schema,
210            row_group_writer_factory,
211            max_row_group_size,
212        })
213    }
214
215    /// Returns metadata for any flushed row groups
216    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217        self.writer.flushed_row_groups()
218    }
219
220    /// Estimated memory usage, in bytes, of this `ArrowWriter`
221    ///
222    /// This estimate is formed bu summing the values of
223    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
224    pub fn memory_size(&self) -> usize {
225        match &self.in_progress {
226            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227            None => 0,
228        }
229    }
230
231    /// Anticipated encoded size of the in progress row group.
232    ///
233    /// This estimate the row group size after being completely encoded is,
234    /// formed by summing the values of
235    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
236    /// columns.
237    pub fn in_progress_size(&self) -> usize {
238        match &self.in_progress {
239            Some(in_progress) => in_progress
240                .writers
241                .iter()
242                .map(|x| x.get_estimated_total_bytes())
243                .sum(),
244            None => 0,
245        }
246    }
247
248    /// Returns the number of rows buffered in the in progress row group
249    pub fn in_progress_rows(&self) -> usize {
250        self.in_progress
251            .as_ref()
252            .map(|x| x.buffered_rows)
253            .unwrap_or_default()
254    }
255
256    /// Returns the number of bytes written by this instance
257    pub fn bytes_written(&self) -> usize {
258        self.writer.bytes_written()
259    }
260
261    /// Encodes the provided [`RecordBatch`]
262    ///
263    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
264    /// rows, the contents of `batch` will be written to one or more row groups such that all but
265    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
266    ///
267    /// This will fail if the `batch`'s schema does not match the writer's schema.
268    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269        if batch.num_rows() == 0 {
270            return Ok(());
271        }
272
273        let in_progress = match &mut self.in_progress {
274            Some(in_progress) => in_progress,
275            x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276                self.writer.schema_descr(),
277                self.writer.properties(),
278                &self.arrow_schema,
279                self.writer.flushed_row_groups().len(),
280            )?),
281        };
282
283        // If would exceed max_row_group_size, split batch
284        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285            let to_write = self.max_row_group_size - in_progress.buffered_rows;
286            let a = batch.slice(0, to_write);
287            let b = batch.slice(to_write, batch.num_rows() - to_write);
288            self.write(&a)?;
289            return self.write(&b);
290        }
291
292        in_progress.write(batch)?;
293
294        if in_progress.buffered_rows >= self.max_row_group_size {
295            self.flush()?
296        }
297        Ok(())
298    }
299
300    /// Flushes all buffered rows into a new row group
301    pub fn flush(&mut self) -> Result<()> {
302        let in_progress = match self.in_progress.take() {
303            Some(in_progress) => in_progress,
304            None => return Ok(()),
305        };
306
307        let mut row_group_writer = self.writer.next_row_group()?;
308        for chunk in in_progress.close()? {
309            chunk.append_to_row_group(&mut row_group_writer)?;
310        }
311        row_group_writer.close()?;
312        Ok(())
313    }
314
315    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
316    ///
317    /// This method provide a way to append kv_metadata after write RecordBatch
318    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
319        self.writer.append_key_value_metadata(kv_metadata)
320    }
321
322    /// Returns a reference to the underlying writer.
323    pub fn inner(&self) -> &W {
324        self.writer.inner()
325    }
326
327    /// Returns a mutable reference to the underlying writer.
328    ///
329    /// It is inadvisable to directly write to the underlying writer, doing so
330    /// will likely result in a corrupt parquet file
331    pub fn inner_mut(&mut self) -> &mut W {
332        self.writer.inner_mut()
333    }
334
335    /// Flushes any outstanding data and returns the underlying writer.
336    pub fn into_inner(mut self) -> Result<W> {
337        self.flush()?;
338        self.writer.into_inner()
339    }
340
341    /// Close and finalize the underlying Parquet writer
342    ///
343    /// Unlike [`Self::close`] this does not consume self
344    ///
345    /// Attempting to write after calling finish will result in an error
346    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
347        self.flush()?;
348        self.writer.finish()
349    }
350
351    /// Close and finalize the underlying Parquet writer
352    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
353        self.finish()
354    }
355}
356
357impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
358    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
359        self.write(batch).map_err(|e| e.into())
360    }
361
362    fn close(self) -> std::result::Result<(), ArrowError> {
363        self.close()?;
364        Ok(())
365    }
366}
367
368/// Arrow-specific configuration settings for writing parquet files.
369///
370/// See [`ArrowWriter`] for how to configure the writer.
371#[derive(Debug, Clone, Default)]
372pub struct ArrowWriterOptions {
373    properties: WriterProperties,
374    skip_arrow_metadata: bool,
375    schema_root: Option<String>,
376}
377
378impl ArrowWriterOptions {
379    /// Creates a new [`ArrowWriterOptions`] with the default settings.
380    pub fn new() -> Self {
381        Self::default()
382    }
383
384    /// Sets the [`WriterProperties`] for writing parquet files.
385    pub fn with_properties(self, properties: WriterProperties) -> Self {
386        Self { properties, ..self }
387    }
388
389    /// Skip encoding the embedded arrow metadata (defaults to `false`)
390    ///
391    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
392    /// by default.
393    ///
394    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
395    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
396        Self {
397            skip_arrow_metadata,
398            ..self
399        }
400    }
401
402    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
403    pub fn with_schema_root(self, schema_root: String) -> Self {
404        Self {
405            schema_root: Some(schema_root),
406            ..self
407        }
408    }
409}
410
411/// A single column chunk produced by [`ArrowColumnWriter`]
412#[derive(Default)]
413struct ArrowColumnChunkData {
414    length: usize,
415    data: Vec<Bytes>,
416}
417
418impl Length for ArrowColumnChunkData {
419    fn len(&self) -> u64 {
420        self.length as _
421    }
422}
423
424impl ChunkReader for ArrowColumnChunkData {
425    type T = ArrowColumnChunkReader;
426
427    fn get_read(&self, start: u64) -> Result<Self::T> {
428        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
429        Ok(ArrowColumnChunkReader(
430            self.data.clone().into_iter().peekable(),
431        ))
432    }
433
434    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
435        unimplemented!()
436    }
437}
438
439/// A [`Read`] for [`ArrowColumnChunkData`]
440struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
441
442impl Read for ArrowColumnChunkReader {
443    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
444        let buffer = loop {
445            match self.0.peek_mut() {
446                Some(b) if b.is_empty() => {
447                    self.0.next();
448                    continue;
449                }
450                Some(b) => break b,
451                None => return Ok(0),
452            }
453        };
454
455        let len = buffer.len().min(out.len());
456        let b = buffer.split_to(len);
457        out[..len].copy_from_slice(&b);
458        Ok(len)
459    }
460}
461
462/// A shared [`ArrowColumnChunkData`]
463///
464/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
465/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
466type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
467
468#[derive(Default)]
469struct ArrowPageWriter {
470    buffer: SharedColumnChunk,
471    #[cfg(feature = "encryption")]
472    page_encryptor: Option<PageEncryptor>,
473}
474
475impl ArrowPageWriter {
476    #[cfg(feature = "encryption")]
477    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
478        self.page_encryptor = page_encryptor;
479        self
480    }
481
482    #[cfg(feature = "encryption")]
483    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
484        self.page_encryptor.as_mut()
485    }
486
487    #[cfg(not(feature = "encryption"))]
488    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
489        None
490    }
491}
492
493impl PageWriter for ArrowPageWriter {
494    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
495        let page = match self.page_encryptor_mut() {
496            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
497            None => page,
498        };
499
500        let page_header = page.to_thrift_header();
501        let header = {
502            let mut header = Vec::with_capacity(1024);
503
504            match self.page_encryptor_mut() {
505                Some(page_encryptor) => {
506                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
507                    if page.compressed_page().is_data_page() {
508                        page_encryptor.increment_page();
509                    }
510                }
511                None => {
512                    let mut protocol = TCompactOutputProtocol::new(&mut header);
513                    page_header.write_to_out_protocol(&mut protocol)?;
514                }
515            };
516
517            Bytes::from(header)
518        };
519
520        let mut buf = self.buffer.try_lock().unwrap();
521
522        let data = page.compressed_page().buffer().clone();
523        let compressed_size = data.len() + header.len();
524
525        let mut spec = PageWriteSpec::new();
526        spec.page_type = page.page_type();
527        spec.num_values = page.num_values();
528        spec.uncompressed_size = page.uncompressed_size() + header.len();
529        spec.offset = buf.length as u64;
530        spec.compressed_size = compressed_size;
531        spec.bytes_written = compressed_size as u64;
532
533        buf.length += compressed_size;
534        buf.data.push(header);
535        buf.data.push(data);
536
537        Ok(spec)
538    }
539
540    fn close(&mut self) -> Result<()> {
541        Ok(())
542    }
543}
544
545/// A leaf column that can be encoded by [`ArrowColumnWriter`]
546#[derive(Debug)]
547pub struct ArrowLeafColumn(ArrayLevels);
548
549/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
550pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
551    let levels = calculate_array_levels(array, field)?;
552    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
553}
554
555/// The data for a single column chunk, see [`ArrowColumnWriter`]
556pub struct ArrowColumnChunk {
557    data: ArrowColumnChunkData,
558    close: ColumnCloseResult,
559}
560
561impl std::fmt::Debug for ArrowColumnChunk {
562    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563        f.debug_struct("ArrowColumnChunk")
564            .field("length", &self.data.length)
565            .finish_non_exhaustive()
566    }
567}
568
569impl ArrowColumnChunk {
570    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
571    pub fn append_to_row_group<W: Write + Send>(
572        self,
573        writer: &mut SerializedRowGroupWriter<'_, W>,
574    ) -> Result<()> {
575        writer.append_column(&self.data, self.close)
576    }
577}
578
579/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
580///
581/// Note: This is a low-level interface for applications that require
582/// fine-grained control of encoding (e.g. encoding using multiple threads),
583/// see [`ArrowWriter`] for a higher-level interface
584///
585/// # Example: Encoding two Arrow Array's in Parallel
586/// ```
587/// // The arrow schema
588/// # use std::sync::Arc;
589/// # use arrow_array::*;
590/// # use arrow_schema::*;
591/// # use parquet::arrow::ArrowSchemaConverter;
592/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
593/// # use parquet::file::properties::WriterProperties;
594/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
595/// #
596/// let schema = Arc::new(Schema::new(vec![
597///     Field::new("i32", DataType::Int32, false),
598///     Field::new("f32", DataType::Float32, false),
599/// ]));
600///
601/// // Compute the parquet schema
602/// let props = Arc::new(WriterProperties::default());
603/// let parquet_schema = ArrowSchemaConverter::new()
604///   .with_coerce_types(props.coerce_types())
605///   .convert(&schema)
606///   .unwrap();
607///
608/// // Create writers for each of the leaf columns
609/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
610///
611/// // Spawn a worker thread for each column
612/// //
613/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
614/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
615/// let mut workers: Vec<_> = col_writers
616///     .into_iter()
617///     .map(|mut col_writer| {
618///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
619///         let handle = std::thread::spawn(move || {
620///             // receive Arrays to encode via the channel
621///             for col in recv {
622///                 col_writer.write(&col)?;
623///             }
624///             // once the input is complete, close the writer
625///             // to return the newly created ArrowColumnChunk
626///             col_writer.close()
627///         });
628///         (handle, send)
629///     })
630///     .collect();
631///
632/// // Create parquet writer
633/// let root_schema = parquet_schema.root_schema_ptr();
634/// // write to memory in the example, but this could be a File
635/// let mut out = Vec::with_capacity(1024);
636/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
637///   .unwrap();
638///
639/// // Start row group
640/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
641///   .next_row_group()
642///   .unwrap();
643///
644/// // Create some example input columns to encode
645/// let to_write = vec![
646///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
647///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
648/// ];
649///
650/// // Send the input columns to the workers
651/// let mut worker_iter = workers.iter_mut();
652/// for (arr, field) in to_write.iter().zip(&schema.fields) {
653///     for leaves in compute_leaves(field, arr).unwrap() {
654///         worker_iter.next().unwrap().1.send(leaves).unwrap();
655///     }
656/// }
657///
658/// // Wait for the workers to complete encoding, and append
659/// // the resulting column chunks to the row group (and the file)
660/// for (handle, send) in workers {
661///     drop(send); // Drop send side to signal termination
662///     // wait for the worker to send the completed chunk
663///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
664///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
665/// }
666/// // Close the row group which writes to the underlying file
667/// row_group_writer.close().unwrap();
668///
669/// let metadata = writer.close().unwrap();
670/// assert_eq!(metadata.num_rows, 3);
671/// ```
672pub struct ArrowColumnWriter {
673    writer: ArrowColumnWriterImpl,
674    chunk: SharedColumnChunk,
675}
676
677impl std::fmt::Debug for ArrowColumnWriter {
678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
680    }
681}
682
683enum ArrowColumnWriterImpl {
684    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
685    Column(ColumnWriter<'static>),
686}
687
688impl ArrowColumnWriter {
689    /// Write an [`ArrowLeafColumn`]
690    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
691        match &mut self.writer {
692            ArrowColumnWriterImpl::Column(c) => {
693                write_leaf(c, &col.0)?;
694            }
695            ArrowColumnWriterImpl::ByteArray(c) => {
696                write_primitive(c, col.0.array().as_ref(), &col.0)?;
697            }
698        }
699        Ok(())
700    }
701
702    /// Close this column returning the written [`ArrowColumnChunk`]
703    pub fn close(self) -> Result<ArrowColumnChunk> {
704        let close = match self.writer {
705            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
706            ArrowColumnWriterImpl::Column(c) => c.close()?,
707        };
708        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
709        let data = chunk.into_inner().unwrap();
710        Ok(ArrowColumnChunk { data, close })
711    }
712
713    /// Returns the estimated total memory usage by the writer.
714    ///
715    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
716    /// of the current memory usage and not it's anticipated encoded size.
717    ///
718    /// This includes:
719    /// 1. Data buffered in encoded form
720    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
721    ///
722    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
723    pub fn memory_size(&self) -> usize {
724        match &self.writer {
725            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
726            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
727        }
728    }
729
730    /// Returns the estimated total encoded bytes for this column writer.
731    ///
732    /// This includes:
733    /// 1. Data buffered in encoded form
734    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
735    ///
736    /// This value should be less than or equal to [`Self::memory_size`]
737    pub fn get_estimated_total_bytes(&self) -> usize {
738        match &self.writer {
739            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
740            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
741        }
742    }
743}
744
745/// Encodes [`RecordBatch`] to a parquet row group
746struct ArrowRowGroupWriter {
747    writers: Vec<ArrowColumnWriter>,
748    schema: SchemaRef,
749    buffered_rows: usize,
750}
751
752impl ArrowRowGroupWriter {
753    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
754        Self {
755            writers,
756            schema: arrow.clone(),
757            buffered_rows: 0,
758        }
759    }
760
761    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
762        self.buffered_rows += batch.num_rows();
763        let mut writers = self.writers.iter_mut();
764        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
765            for leaf in compute_leaves(field.as_ref(), column)? {
766                writers.next().unwrap().write(&leaf)?
767            }
768        }
769        Ok(())
770    }
771
772    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
773        self.writers
774            .into_iter()
775            .map(|writer| writer.close())
776            .collect()
777    }
778}
779
780struct ArrowRowGroupWriterFactory {
781    #[cfg(feature = "encryption")]
782    file_encryptor: Option<Arc<FileEncryptor>>,
783}
784
785impl ArrowRowGroupWriterFactory {
786    #[cfg(feature = "encryption")]
787    fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
788        Self {
789            file_encryptor: file_writer.file_encryptor(),
790        }
791    }
792
793    #[cfg(not(feature = "encryption"))]
794    fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
795        Self {}
796    }
797
798    #[cfg(feature = "encryption")]
799    fn create_row_group_writer(
800        &self,
801        parquet: &SchemaDescriptor,
802        props: &WriterPropertiesPtr,
803        arrow: &SchemaRef,
804        row_group_index: usize,
805    ) -> Result<ArrowRowGroupWriter> {
806        let writers = get_column_writers_with_encryptor(
807            parquet,
808            props,
809            arrow,
810            self.file_encryptor.clone(),
811            row_group_index,
812        )?;
813        Ok(ArrowRowGroupWriter::new(writers, arrow))
814    }
815
816    #[cfg(not(feature = "encryption"))]
817    fn create_row_group_writer(
818        &self,
819        parquet: &SchemaDescriptor,
820        props: &WriterPropertiesPtr,
821        arrow: &SchemaRef,
822        _row_group_index: usize,
823    ) -> Result<ArrowRowGroupWriter> {
824        let writers = get_column_writers(parquet, props, arrow)?;
825        Ok(ArrowRowGroupWriter::new(writers, arrow))
826    }
827}
828
829/// Returns the [`ArrowColumnWriter`] for a given schema
830pub fn get_column_writers(
831    parquet: &SchemaDescriptor,
832    props: &WriterPropertiesPtr,
833    arrow: &SchemaRef,
834) -> Result<Vec<ArrowColumnWriter>> {
835    let mut writers = Vec::with_capacity(arrow.fields.len());
836    let mut leaves = parquet.columns().iter();
837    let column_factory = ArrowColumnWriterFactory::new();
838    for field in &arrow.fields {
839        column_factory.get_arrow_column_writer(
840            field.data_type(),
841            props,
842            &mut leaves,
843            &mut writers,
844        )?;
845    }
846    Ok(writers)
847}
848
849/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
850#[cfg(feature = "encryption")]
851fn get_column_writers_with_encryptor(
852    parquet: &SchemaDescriptor,
853    props: &WriterPropertiesPtr,
854    arrow: &SchemaRef,
855    file_encryptor: Option<Arc<FileEncryptor>>,
856    row_group_index: usize,
857) -> Result<Vec<ArrowColumnWriter>> {
858    let mut writers = Vec::with_capacity(arrow.fields.len());
859    let mut leaves = parquet.columns().iter();
860    let column_factory =
861        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
862    for field in &arrow.fields {
863        column_factory.get_arrow_column_writer(
864            field.data_type(),
865            props,
866            &mut leaves,
867            &mut writers,
868        )?;
869    }
870    Ok(writers)
871}
872
873/// Gets [`ArrowColumnWriter`] instances for different data types
874struct ArrowColumnWriterFactory {
875    #[cfg(feature = "encryption")]
876    row_group_index: usize,
877    #[cfg(feature = "encryption")]
878    file_encryptor: Option<Arc<FileEncryptor>>,
879}
880
881impl ArrowColumnWriterFactory {
882    pub fn new() -> Self {
883        Self {
884            #[cfg(feature = "encryption")]
885            row_group_index: 0,
886            #[cfg(feature = "encryption")]
887            file_encryptor: None,
888        }
889    }
890
891    #[cfg(feature = "encryption")]
892    pub fn with_file_encryptor(
893        mut self,
894        row_group_index: usize,
895        file_encryptor: Option<Arc<FileEncryptor>>,
896    ) -> Self {
897        self.row_group_index = row_group_index;
898        self.file_encryptor = file_encryptor;
899        self
900    }
901
902    #[cfg(feature = "encryption")]
903    fn create_page_writer(
904        &self,
905        column_descriptor: &ColumnDescPtr,
906        column_index: usize,
907    ) -> Result<Box<ArrowPageWriter>> {
908        let column_path = column_descriptor.path().string();
909        let page_encryptor = PageEncryptor::create_if_column_encrypted(
910            &self.file_encryptor,
911            self.row_group_index,
912            column_index,
913            &column_path,
914        )?;
915        Ok(Box::new(
916            ArrowPageWriter::default().with_encryptor(page_encryptor),
917        ))
918    }
919
920    #[cfg(not(feature = "encryption"))]
921    fn create_page_writer(
922        &self,
923        _column_descriptor: &ColumnDescPtr,
924        _column_index: usize,
925    ) -> Result<Box<ArrowPageWriter>> {
926        Ok(Box::<ArrowPageWriter>::default())
927    }
928
929    /// Gets the [`ArrowColumnWriter`] for the given `data_type`
930    fn get_arrow_column_writer(
931        &self,
932        data_type: &ArrowDataType,
933        props: &WriterPropertiesPtr,
934        leaves: &mut Iter<'_, ColumnDescPtr>,
935        out: &mut Vec<ArrowColumnWriter>,
936    ) -> Result<()> {
937        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
938            let page_writer = self.create_page_writer(desc, out.len())?;
939            let chunk = page_writer.buffer.clone();
940            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
941            Ok(ArrowColumnWriter {
942                chunk,
943                writer: ArrowColumnWriterImpl::Column(writer),
944            })
945        };
946
947        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
948            let page_writer = self.create_page_writer(desc, out.len())?;
949            let chunk = page_writer.buffer.clone();
950            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
951            Ok(ArrowColumnWriter {
952                chunk,
953                writer: ArrowColumnWriterImpl::ByteArray(writer),
954            })
955        };
956
957        match data_type {
958            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
959            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
960            ArrowDataType::LargeBinary
961            | ArrowDataType::Binary
962            | ArrowDataType::Utf8
963            | ArrowDataType::LargeUtf8
964            | ArrowDataType::BinaryView
965            | ArrowDataType::Utf8View => {
966                out.push(bytes(leaves.next().unwrap())?)
967            }
968            ArrowDataType::List(f)
969            | ArrowDataType::LargeList(f)
970            | ArrowDataType::FixedSizeList(f, _) => {
971                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
972            }
973            ArrowDataType::Struct(fields) => {
974                for field in fields {
975                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
976                }
977            }
978            ArrowDataType::Map(f, _) => match f.data_type() {
979                ArrowDataType::Struct(f) => {
980                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
981                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
982                }
983                _ => unreachable!("invalid map type"),
984            }
985            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
986                ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
987                    out.push(bytes(leaves.next().unwrap())?)
988                }
989                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
990                    out.push(bytes(leaves.next().unwrap())?)
991                }
992                ArrowDataType::FixedSizeBinary(_) => {
993                    out.push(bytes(leaves.next().unwrap())?)
994                }
995                _ => {
996                    out.push(col(leaves.next().unwrap())?)
997                }
998            }
999            _ => return Err(ParquetError::NYI(
1000                format!(
1001                    "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1002                )
1003            ))
1004        }
1005        Ok(())
1006    }
1007}
1008
1009fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1010    let column = levels.array().as_ref();
1011    let indices = levels.non_null_indices();
1012    match writer {
1013        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1014            match column.data_type() {
1015                ArrowDataType::Date64 => {
1016                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1017                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1018                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1019
1020                    let array = array.as_primitive::<Int32Type>();
1021                    write_primitive(typed, array.values(), levels)
1022                }
1023                ArrowDataType::UInt32 => {
1024                    let values = column.as_primitive::<UInt32Type>().values();
1025                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1026                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1027                    let array = values.inner().typed_data::<i32>();
1028                    write_primitive(typed, array, levels)
1029                }
1030                ArrowDataType::Decimal128(_, _) => {
1031                    // use the int32 to represent the decimal with low precision
1032                    let array = column
1033                        .as_primitive::<Decimal128Type>()
1034                        .unary::<_, Int32Type>(|v| v as i32);
1035                    write_primitive(typed, array.values(), levels)
1036                }
1037                ArrowDataType::Decimal256(_, _) => {
1038                    // use the int32 to represent the decimal with low precision
1039                    let array = column
1040                        .as_primitive::<Decimal256Type>()
1041                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1042                    write_primitive(typed, array.values(), levels)
1043                }
1044                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1045                    ArrowDataType::Decimal128(_, _) => {
1046                        let array = arrow_cast::cast(column, value_type)?;
1047                        let array = array
1048                            .as_primitive::<Decimal128Type>()
1049                            .unary::<_, Int32Type>(|v| v as i32);
1050                        write_primitive(typed, array.values(), levels)
1051                    }
1052                    ArrowDataType::Decimal256(_, _) => {
1053                        let array = arrow_cast::cast(column, value_type)?;
1054                        let array = array
1055                            .as_primitive::<Decimal256Type>()
1056                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1057                        write_primitive(typed, array.values(), levels)
1058                    }
1059                    _ => {
1060                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1061                        let array = array.as_primitive::<Int32Type>();
1062                        write_primitive(typed, array.values(), levels)
1063                    }
1064                },
1065                _ => {
1066                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1067                    let array = array.as_primitive::<Int32Type>();
1068                    write_primitive(typed, array.values(), levels)
1069                }
1070            }
1071        }
1072        ColumnWriter::BoolColumnWriter(ref mut typed) => {
1073            let array = column.as_boolean();
1074            typed.write_batch(
1075                get_bool_array_slice(array, indices).as_slice(),
1076                levels.def_levels(),
1077                levels.rep_levels(),
1078            )
1079        }
1080        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1081            match column.data_type() {
1082                ArrowDataType::Date64 => {
1083                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1084
1085                    let array = array.as_primitive::<Int64Type>();
1086                    write_primitive(typed, array.values(), levels)
1087                }
1088                ArrowDataType::Int64 => {
1089                    let array = column.as_primitive::<Int64Type>();
1090                    write_primitive(typed, array.values(), levels)
1091                }
1092                ArrowDataType::UInt64 => {
1093                    let values = column.as_primitive::<UInt64Type>().values();
1094                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1095                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1096                    let array = values.inner().typed_data::<i64>();
1097                    write_primitive(typed, array, levels)
1098                }
1099                ArrowDataType::Decimal128(_, _) => {
1100                    // use the int64 to represent the decimal with low precision
1101                    let array = column
1102                        .as_primitive::<Decimal128Type>()
1103                        .unary::<_, Int64Type>(|v| v as i64);
1104                    write_primitive(typed, array.values(), levels)
1105                }
1106                ArrowDataType::Decimal256(_, _) => {
1107                    // use the int64 to represent the decimal with low precision
1108                    let array = column
1109                        .as_primitive::<Decimal256Type>()
1110                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1111                    write_primitive(typed, array.values(), levels)
1112                }
1113                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1114                    ArrowDataType::Decimal128(_, _) => {
1115                        let array = arrow_cast::cast(column, value_type)?;
1116                        let array = array
1117                            .as_primitive::<Decimal128Type>()
1118                            .unary::<_, Int64Type>(|v| v as i64);
1119                        write_primitive(typed, array.values(), levels)
1120                    }
1121                    ArrowDataType::Decimal256(_, _) => {
1122                        let array = arrow_cast::cast(column, value_type)?;
1123                        let array = array
1124                            .as_primitive::<Decimal256Type>()
1125                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1126                        write_primitive(typed, array.values(), levels)
1127                    }
1128                    _ => {
1129                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1130                        let array = array.as_primitive::<Int64Type>();
1131                        write_primitive(typed, array.values(), levels)
1132                    }
1133                },
1134                _ => {
1135                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1136                    let array = array.as_primitive::<Int64Type>();
1137                    write_primitive(typed, array.values(), levels)
1138                }
1139            }
1140        }
1141        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1142            unreachable!("Currently unreachable because data type not supported")
1143        }
1144        ColumnWriter::FloatColumnWriter(ref mut typed) => {
1145            let array = column.as_primitive::<Float32Type>();
1146            write_primitive(typed, array.values(), levels)
1147        }
1148        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1149            let array = column.as_primitive::<Float64Type>();
1150            write_primitive(typed, array.values(), levels)
1151        }
1152        ColumnWriter::ByteArrayColumnWriter(_) => {
1153            unreachable!("should use ByteArrayWriter")
1154        }
1155        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1156            let bytes = match column.data_type() {
1157                ArrowDataType::Interval(interval_unit) => match interval_unit {
1158                    IntervalUnit::YearMonth => {
1159                        let array = column
1160                            .as_any()
1161                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1162                            .unwrap();
1163                        get_interval_ym_array_slice(array, indices)
1164                    }
1165                    IntervalUnit::DayTime => {
1166                        let array = column
1167                            .as_any()
1168                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1169                            .unwrap();
1170                        get_interval_dt_array_slice(array, indices)
1171                    }
1172                    _ => {
1173                        return Err(ParquetError::NYI(
1174                            format!(
1175                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1176                            )
1177                        ));
1178                    }
1179                },
1180                ArrowDataType::FixedSizeBinary(_) => {
1181                    let array = column
1182                        .as_any()
1183                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1184                        .unwrap();
1185                    get_fsb_array_slice(array, indices)
1186                }
1187                ArrowDataType::Decimal128(_, _) => {
1188                    let array = column.as_primitive::<Decimal128Type>();
1189                    get_decimal_128_array_slice(array, indices)
1190                }
1191                ArrowDataType::Decimal256(_, _) => {
1192                    let array = column
1193                        .as_any()
1194                        .downcast_ref::<arrow_array::Decimal256Array>()
1195                        .unwrap();
1196                    get_decimal_256_array_slice(array, indices)
1197                }
1198                ArrowDataType::Float16 => {
1199                    let array = column.as_primitive::<Float16Type>();
1200                    get_float_16_array_slice(array, indices)
1201                }
1202                _ => {
1203                    return Err(ParquetError::NYI(
1204                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1205                    ));
1206                }
1207            };
1208            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1209        }
1210    }
1211}
1212
1213fn write_primitive<E: ColumnValueEncoder>(
1214    writer: &mut GenericColumnWriter<E>,
1215    values: &E::Values,
1216    levels: &ArrayLevels,
1217) -> Result<usize> {
1218    writer.write_batch_internal(
1219        values,
1220        Some(levels.non_null_indices()),
1221        levels.def_levels(),
1222        levels.rep_levels(),
1223        None,
1224        None,
1225        None,
1226    )
1227}
1228
1229fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1230    let mut values = Vec::with_capacity(indices.len());
1231    for i in indices {
1232        values.push(array.value(*i))
1233    }
1234    values
1235}
1236
1237/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1238/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1239fn get_interval_ym_array_slice(
1240    array: &arrow_array::IntervalYearMonthArray,
1241    indices: &[usize],
1242) -> Vec<FixedLenByteArray> {
1243    let mut values = Vec::with_capacity(indices.len());
1244    for i in indices {
1245        let mut value = array.value(*i).to_le_bytes().to_vec();
1246        let mut suffix = vec![0; 8];
1247        value.append(&mut suffix);
1248        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1249    }
1250    values
1251}
1252
1253/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1254/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1255fn get_interval_dt_array_slice(
1256    array: &arrow_array::IntervalDayTimeArray,
1257    indices: &[usize],
1258) -> Vec<FixedLenByteArray> {
1259    let mut values = Vec::with_capacity(indices.len());
1260    for i in indices {
1261        let mut out = [0; 12];
1262        let value = array.value(*i);
1263        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1264        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1265        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1266    }
1267    values
1268}
1269
1270fn get_decimal_128_array_slice(
1271    array: &arrow_array::Decimal128Array,
1272    indices: &[usize],
1273) -> Vec<FixedLenByteArray> {
1274    let mut values = Vec::with_capacity(indices.len());
1275    let size = decimal_length_from_precision(array.precision());
1276    for i in indices {
1277        let as_be_bytes = array.value(*i).to_be_bytes();
1278        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1279        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1280    }
1281    values
1282}
1283
1284fn get_decimal_256_array_slice(
1285    array: &arrow_array::Decimal256Array,
1286    indices: &[usize],
1287) -> Vec<FixedLenByteArray> {
1288    let mut values = Vec::with_capacity(indices.len());
1289    let size = decimal_length_from_precision(array.precision());
1290    for i in indices {
1291        let as_be_bytes = array.value(*i).to_be_bytes();
1292        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1293        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1294    }
1295    values
1296}
1297
1298fn get_float_16_array_slice(
1299    array: &arrow_array::Float16Array,
1300    indices: &[usize],
1301) -> Vec<FixedLenByteArray> {
1302    let mut values = Vec::with_capacity(indices.len());
1303    for i in indices {
1304        let value = array.value(*i).to_le_bytes().to_vec();
1305        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1306    }
1307    values
1308}
1309
1310fn get_fsb_array_slice(
1311    array: &arrow_array::FixedSizeBinaryArray,
1312    indices: &[usize],
1313) -> Vec<FixedLenByteArray> {
1314    let mut values = Vec::with_capacity(indices.len());
1315    for i in indices {
1316        let value = array.value(*i).to_vec();
1317        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1318    }
1319    values
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324    use super::*;
1325
1326    use std::fs::File;
1327
1328    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1329    use crate::arrow::ARROW_SCHEMA_META_KEY;
1330    use arrow::datatypes::ToByteSlice;
1331    use arrow::datatypes::{DataType, Schema};
1332    use arrow::error::Result as ArrowResult;
1333    use arrow::util::data_gen::create_random_array;
1334    use arrow::util::pretty::pretty_format_batches;
1335    use arrow::{array::*, buffer::Buffer};
1336    use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1337    use arrow_schema::Fields;
1338    use half::f16;
1339    use num::{FromPrimitive, ToPrimitive};
1340
1341    use crate::basic::Encoding;
1342    use crate::data_type::AsBytes;
1343    use crate::file::metadata::ParquetMetaData;
1344    use crate::file::page_index::index::Index;
1345    use crate::file::page_index::index_reader::read_offset_indexes;
1346    use crate::file::properties::{
1347        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1348    };
1349    use crate::file::serialized_reader::ReadOptionsBuilder;
1350    use crate::file::{
1351        reader::{FileReader, SerializedFileReader},
1352        statistics::Statistics,
1353    };
1354
1355    #[test]
1356    fn arrow_writer() {
1357        // define schema
1358        let schema = Schema::new(vec![
1359            Field::new("a", DataType::Int32, false),
1360            Field::new("b", DataType::Int32, true),
1361        ]);
1362
1363        // create some data
1364        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1365        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1366
1367        // build a record batch
1368        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1369
1370        roundtrip(batch, Some(SMALL_SIZE / 2));
1371    }
1372
1373    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1374        let mut buffer = vec![];
1375
1376        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1377        writer.write(expected_batch).unwrap();
1378        writer.close().unwrap();
1379
1380        buffer
1381    }
1382
1383    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1384        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1385        writer.write(expected_batch).unwrap();
1386        writer.into_inner().unwrap()
1387    }
1388
1389    #[test]
1390    fn roundtrip_bytes() {
1391        // define schema
1392        let schema = Arc::new(Schema::new(vec![
1393            Field::new("a", DataType::Int32, false),
1394            Field::new("b", DataType::Int32, true),
1395        ]));
1396
1397        // create some data
1398        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1399        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1400
1401        // build a record batch
1402        let expected_batch =
1403            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1404
1405        for buffer in [
1406            get_bytes_after_close(schema.clone(), &expected_batch),
1407            get_bytes_by_into_inner(schema, &expected_batch),
1408        ] {
1409            let cursor = Bytes::from(buffer);
1410            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1411
1412            let actual_batch = record_batch_reader
1413                .next()
1414                .expect("No batch found")
1415                .expect("Unable to get batch");
1416
1417            assert_eq!(expected_batch.schema(), actual_batch.schema());
1418            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1419            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1420            for i in 0..expected_batch.num_columns() {
1421                let expected_data = expected_batch.column(i).to_data();
1422                let actual_data = actual_batch.column(i).to_data();
1423
1424                assert_eq!(expected_data, actual_data);
1425            }
1426        }
1427    }
1428
1429    #[test]
1430    fn arrow_writer_non_null() {
1431        // define schema
1432        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1433
1434        // create some data
1435        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1436
1437        // build a record batch
1438        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1439
1440        roundtrip(batch, Some(SMALL_SIZE / 2));
1441    }
1442
1443    #[test]
1444    fn arrow_writer_list() {
1445        // define schema
1446        let schema = Schema::new(vec![Field::new(
1447            "a",
1448            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1449            true,
1450        )]);
1451
1452        // create some data
1453        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1454
1455        // Construct a buffer for value offsets, for the nested array:
1456        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1457        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1458
1459        // Construct a list array from the above two
1460        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1461            DataType::Int32,
1462            false,
1463        ))))
1464        .len(5)
1465        .add_buffer(a_value_offsets)
1466        .add_child_data(a_values.into_data())
1467        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1468        .build()
1469        .unwrap();
1470        let a = ListArray::from(a_list_data);
1471
1472        // build a record batch
1473        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1474
1475        assert_eq!(batch.column(0).null_count(), 1);
1476
1477        // This test fails if the max row group size is less than the batch's length
1478        // see https://github.com/apache/arrow-rs/issues/518
1479        roundtrip(batch, None);
1480    }
1481
1482    #[test]
1483    fn arrow_writer_list_non_null() {
1484        // define schema
1485        let schema = Schema::new(vec![Field::new(
1486            "a",
1487            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1488            false,
1489        )]);
1490
1491        // create some data
1492        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1493
1494        // Construct a buffer for value offsets, for the nested array:
1495        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1496        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1497
1498        // Construct a list array from the above two
1499        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1500            DataType::Int32,
1501            false,
1502        ))))
1503        .len(5)
1504        .add_buffer(a_value_offsets)
1505        .add_child_data(a_values.into_data())
1506        .build()
1507        .unwrap();
1508        let a = ListArray::from(a_list_data);
1509
1510        // build a record batch
1511        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1512
1513        // This test fails if the max row group size is less than the batch's length
1514        // see https://github.com/apache/arrow-rs/issues/518
1515        assert_eq!(batch.column(0).null_count(), 0);
1516
1517        roundtrip(batch, None);
1518    }
1519
1520    #[test]
1521    fn arrow_writer_binary() {
1522        let string_field = Field::new("a", DataType::Utf8, false);
1523        let binary_field = Field::new("b", DataType::Binary, false);
1524        let schema = Schema::new(vec![string_field, binary_field]);
1525
1526        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1527        let raw_binary_values = [
1528            b"foo".to_vec(),
1529            b"bar".to_vec(),
1530            b"baz".to_vec(),
1531            b"quux".to_vec(),
1532        ];
1533        let raw_binary_value_refs = raw_binary_values
1534            .iter()
1535            .map(|x| x.as_slice())
1536            .collect::<Vec<_>>();
1537
1538        let string_values = StringArray::from(raw_string_values.clone());
1539        let binary_values = BinaryArray::from(raw_binary_value_refs);
1540        let batch = RecordBatch::try_new(
1541            Arc::new(schema),
1542            vec![Arc::new(string_values), Arc::new(binary_values)],
1543        )
1544        .unwrap();
1545
1546        roundtrip(batch, Some(SMALL_SIZE / 2));
1547    }
1548
1549    #[test]
1550    fn arrow_writer_binary_view() {
1551        let string_field = Field::new("a", DataType::Utf8View, false);
1552        let binary_field = Field::new("b", DataType::BinaryView, false);
1553        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1554        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1555
1556        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1557        let raw_binary_values = vec![
1558            b"foo".to_vec(),
1559            b"bar".to_vec(),
1560            b"large payload over 12 bytes".to_vec(),
1561            b"lulu".to_vec(),
1562        ];
1563        let nullable_string_values =
1564            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1565
1566        let string_view_values = StringViewArray::from(raw_string_values);
1567        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1568        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1569        let batch = RecordBatch::try_new(
1570            Arc::new(schema),
1571            vec![
1572                Arc::new(string_view_values),
1573                Arc::new(binary_view_values),
1574                Arc::new(nullable_string_view_values),
1575            ],
1576        )
1577        .unwrap();
1578
1579        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1580        roundtrip(batch, None);
1581    }
1582
1583    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1584        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1585        let schema = Schema::new(vec![decimal_field]);
1586
1587        let decimal_values = vec![10_000, 50_000, 0, -100]
1588            .into_iter()
1589            .map(Some)
1590            .collect::<Decimal128Array>()
1591            .with_precision_and_scale(precision, scale)
1592            .unwrap();
1593
1594        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1595    }
1596
1597    #[test]
1598    fn arrow_writer_decimal() {
1599        // int32 to store the decimal value
1600        let batch_int32_decimal = get_decimal_batch(5, 2);
1601        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1602        // int64 to store the decimal value
1603        let batch_int64_decimal = get_decimal_batch(12, 2);
1604        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1605        // fixed_length_byte_array to store the decimal value
1606        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1607        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1608    }
1609
1610    #[test]
1611    fn arrow_writer_complex() {
1612        // define schema
1613        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1614        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1615        let struct_field_g = Arc::new(Field::new_list(
1616            "g",
1617            Field::new_list_field(DataType::Int16, true),
1618            false,
1619        ));
1620        let struct_field_h = Arc::new(Field::new_list(
1621            "h",
1622            Field::new_list_field(DataType::Int16, false),
1623            true,
1624        ));
1625        let struct_field_e = Arc::new(Field::new_struct(
1626            "e",
1627            vec![
1628                struct_field_f.clone(),
1629                struct_field_g.clone(),
1630                struct_field_h.clone(),
1631            ],
1632            false,
1633        ));
1634        let schema = Schema::new(vec![
1635            Field::new("a", DataType::Int32, false),
1636            Field::new("b", DataType::Int32, true),
1637            Field::new_struct(
1638                "c",
1639                vec![struct_field_d.clone(), struct_field_e.clone()],
1640                false,
1641            ),
1642        ]);
1643
1644        // create some data
1645        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1646        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1647        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1648        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1649
1650        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1651
1652        // Construct a buffer for value offsets, for the nested array:
1653        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1654        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1655
1656        // Construct a list array from the above two
1657        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1658            .len(5)
1659            .add_buffer(g_value_offsets.clone())
1660            .add_child_data(g_value.to_data())
1661            .build()
1662            .unwrap();
1663        let g = ListArray::from(g_list_data);
1664        // The difference between g and h is that h has a null bitmap
1665        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1666            .len(5)
1667            .add_buffer(g_value_offsets)
1668            .add_child_data(g_value.to_data())
1669            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1670            .build()
1671            .unwrap();
1672        let h = ListArray::from(h_list_data);
1673
1674        let e = StructArray::from(vec![
1675            (struct_field_f, Arc::new(f) as ArrayRef),
1676            (struct_field_g, Arc::new(g) as ArrayRef),
1677            (struct_field_h, Arc::new(h) as ArrayRef),
1678        ]);
1679
1680        let c = StructArray::from(vec![
1681            (struct_field_d, Arc::new(d) as ArrayRef),
1682            (struct_field_e, Arc::new(e) as ArrayRef),
1683        ]);
1684
1685        // build a record batch
1686        let batch = RecordBatch::try_new(
1687            Arc::new(schema),
1688            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1689        )
1690        .unwrap();
1691
1692        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1693        roundtrip(batch, Some(SMALL_SIZE / 3));
1694    }
1695
1696    #[test]
1697    fn arrow_writer_complex_mixed() {
1698        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1699        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1700
1701        // define schema
1702        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1703        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1704        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1705        let schema = Schema::new(vec![Field::new(
1706            "some_nested_object",
1707            DataType::Struct(Fields::from(vec![
1708                offset_field.clone(),
1709                partition_field.clone(),
1710                topic_field.clone(),
1711            ])),
1712            false,
1713        )]);
1714
1715        // create some data
1716        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1717        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1718        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1719
1720        let some_nested_object = StructArray::from(vec![
1721            (offset_field, Arc::new(offset) as ArrayRef),
1722            (partition_field, Arc::new(partition) as ArrayRef),
1723            (topic_field, Arc::new(topic) as ArrayRef),
1724        ]);
1725
1726        // build a record batch
1727        let batch =
1728            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1729
1730        roundtrip(batch, Some(SMALL_SIZE / 2));
1731    }
1732
1733    #[test]
1734    fn arrow_writer_map() {
1735        // Note: we are using the JSON Arrow reader for brevity
1736        let json_content = r#"
1737        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1738        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1739        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1740        "#;
1741        let entries_struct_type = DataType::Struct(Fields::from(vec![
1742            Field::new("key", DataType::Utf8, false),
1743            Field::new("value", DataType::Utf8, true),
1744        ]));
1745        let stocks_field = Field::new(
1746            "stocks",
1747            DataType::Map(
1748                Arc::new(Field::new("entries", entries_struct_type, false)),
1749                false,
1750            ),
1751            true,
1752        );
1753        let schema = Arc::new(Schema::new(vec![stocks_field]));
1754        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1755        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1756
1757        let batch = reader.next().unwrap().unwrap();
1758        roundtrip(batch, None);
1759    }
1760
1761    #[test]
1762    fn arrow_writer_2_level_struct() {
1763        // tests writing <struct<struct<primitive>>
1764        let field_c = Field::new("c", DataType::Int32, true);
1765        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1766        let type_a = DataType::Struct(vec![field_b.clone()].into());
1767        let field_a = Field::new("a", type_a, true);
1768        let schema = Schema::new(vec![field_a.clone()]);
1769
1770        // create data
1771        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1772        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1773            .len(6)
1774            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1775            .add_child_data(c.into_data())
1776            .build()
1777            .unwrap();
1778        let b = StructArray::from(b_data);
1779        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1780            .len(6)
1781            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1782            .add_child_data(b.into_data())
1783            .build()
1784            .unwrap();
1785        let a = StructArray::from(a_data);
1786
1787        assert_eq!(a.null_count(), 1);
1788        assert_eq!(a.column(0).null_count(), 2);
1789
1790        // build a racord batch
1791        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1792
1793        roundtrip(batch, Some(SMALL_SIZE / 2));
1794    }
1795
1796    #[test]
1797    fn arrow_writer_2_level_struct_non_null() {
1798        // tests writing <struct<struct<primitive>>
1799        let field_c = Field::new("c", DataType::Int32, false);
1800        let type_b = DataType::Struct(vec![field_c].into());
1801        let field_b = Field::new("b", type_b.clone(), false);
1802        let type_a = DataType::Struct(vec![field_b].into());
1803        let field_a = Field::new("a", type_a.clone(), false);
1804        let schema = Schema::new(vec![field_a]);
1805
1806        // create data
1807        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1808        let b_data = ArrayDataBuilder::new(type_b)
1809            .len(6)
1810            .add_child_data(c.into_data())
1811            .build()
1812            .unwrap();
1813        let b = StructArray::from(b_data);
1814        let a_data = ArrayDataBuilder::new(type_a)
1815            .len(6)
1816            .add_child_data(b.into_data())
1817            .build()
1818            .unwrap();
1819        let a = StructArray::from(a_data);
1820
1821        assert_eq!(a.null_count(), 0);
1822        assert_eq!(a.column(0).null_count(), 0);
1823
1824        // build a racord batch
1825        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1826
1827        roundtrip(batch, Some(SMALL_SIZE / 2));
1828    }
1829
1830    #[test]
1831    fn arrow_writer_2_level_struct_mixed_null() {
1832        // tests writing <struct<struct<primitive>>
1833        let field_c = Field::new("c", DataType::Int32, false);
1834        let type_b = DataType::Struct(vec![field_c].into());
1835        let field_b = Field::new("b", type_b.clone(), true);
1836        let type_a = DataType::Struct(vec![field_b].into());
1837        let field_a = Field::new("a", type_a.clone(), false);
1838        let schema = Schema::new(vec![field_a]);
1839
1840        // create data
1841        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1842        let b_data = ArrayDataBuilder::new(type_b)
1843            .len(6)
1844            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1845            .add_child_data(c.into_data())
1846            .build()
1847            .unwrap();
1848        let b = StructArray::from(b_data);
1849        // a intentionally has no null buffer, to test that this is handled correctly
1850        let a_data = ArrayDataBuilder::new(type_a)
1851            .len(6)
1852            .add_child_data(b.into_data())
1853            .build()
1854            .unwrap();
1855        let a = StructArray::from(a_data);
1856
1857        assert_eq!(a.null_count(), 0);
1858        assert_eq!(a.column(0).null_count(), 2);
1859
1860        // build a racord batch
1861        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1862
1863        roundtrip(batch, Some(SMALL_SIZE / 2));
1864    }
1865
1866    #[test]
1867    fn arrow_writer_2_level_struct_mixed_null_2() {
1868        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
1869        let field_c = Field::new("c", DataType::Int32, false);
1870        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1871        let field_e = Field::new(
1872            "e",
1873            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1874            false,
1875        );
1876
1877        let field_b = Field::new(
1878            "b",
1879            DataType::Struct(vec![field_c, field_d, field_e].into()),
1880            false,
1881        );
1882        let type_a = DataType::Struct(vec![field_b.clone()].into());
1883        let field_a = Field::new("a", type_a, true);
1884        let schema = Schema::new(vec![field_a.clone()]);
1885
1886        // create data
1887        let c = Int32Array::from_iter_values(0..6);
1888        let d = FixedSizeBinaryArray::try_from_iter(
1889            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1890        )
1891        .expect("four byte values");
1892        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1893        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1894            .len(6)
1895            .add_child_data(c.into_data())
1896            .add_child_data(d.into_data())
1897            .add_child_data(e.into_data())
1898            .build()
1899            .unwrap();
1900        let b = StructArray::from(b_data);
1901        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1902            .len(6)
1903            .null_bit_buffer(Some(Buffer::from([0b00100101])))
1904            .add_child_data(b.into_data())
1905            .build()
1906            .unwrap();
1907        let a = StructArray::from(a_data);
1908
1909        assert_eq!(a.null_count(), 3);
1910        assert_eq!(a.column(0).null_count(), 0);
1911
1912        // build a record batch
1913        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1914
1915        roundtrip(batch, Some(SMALL_SIZE / 2));
1916    }
1917
1918    #[test]
1919    fn test_fixed_size_binary_in_dict() {
1920        fn test_fixed_size_binary_in_dict_inner<K>()
1921        where
1922            K: ArrowDictionaryKeyType,
1923            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
1924            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
1925        {
1926            let field = Field::new(
1927                "a",
1928                DataType::Dictionary(
1929                    Box::new(K::DATA_TYPE),
1930                    Box::new(DataType::FixedSizeBinary(4)),
1931                ),
1932                false,
1933            );
1934            let schema = Schema::new(vec![field]);
1935
1936            let keys: Vec<K::Native> = vec![
1937                K::Native::try_from(0u8).unwrap(),
1938                K::Native::try_from(0u8).unwrap(),
1939                K::Native::try_from(1u8).unwrap(),
1940            ];
1941            let keys = PrimitiveArray::<K>::from_iter_values(keys);
1942            let values = FixedSizeBinaryArray::try_from_iter(
1943                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
1944            )
1945            .unwrap();
1946
1947            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
1948            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
1949            roundtrip(batch, None);
1950        }
1951
1952        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
1953        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1954        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
1955        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1956        test_fixed_size_binary_in_dict_inner::<Int8Type>();
1957        test_fixed_size_binary_in_dict_inner::<Int16Type>();
1958        test_fixed_size_binary_in_dict_inner::<Int32Type>();
1959        test_fixed_size_binary_in_dict_inner::<Int64Type>();
1960    }
1961
1962    #[test]
1963    fn test_empty_dict() {
1964        let struct_fields = Fields::from(vec![Field::new(
1965            "dict",
1966            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1967            false,
1968        )]);
1969
1970        let schema = Schema::new(vec![Field::new_struct(
1971            "struct",
1972            struct_fields.clone(),
1973            true,
1974        )]);
1975        let dictionary = Arc::new(DictionaryArray::new(
1976            Int32Array::new_null(5),
1977            Arc::new(StringArray::new_null(0)),
1978        ));
1979
1980        let s = StructArray::new(
1981            struct_fields,
1982            vec![dictionary],
1983            Some(NullBuffer::new_null(5)),
1984        );
1985
1986        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1987        roundtrip(batch, None);
1988    }
1989    #[test]
1990    fn arrow_writer_page_size() {
1991        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1992
1993        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1994
1995        // Generate an array of 10 unique 10 character string
1996        for i in 0..10 {
1997            let value = i
1998                .to_string()
1999                .repeat(10)
2000                .chars()
2001                .take(10)
2002                .collect::<String>();
2003
2004            builder.append_value(value);
2005        }
2006
2007        let array = Arc::new(builder.finish());
2008
2009        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2010
2011        let file = tempfile::tempfile().unwrap();
2012
2013        // Set everything very low so we fallback to PLAIN encoding after the first row
2014        let props = WriterProperties::builder()
2015            .set_data_page_size_limit(1)
2016            .set_dictionary_page_size_limit(1)
2017            .set_write_batch_size(1)
2018            .build();
2019
2020        let mut writer =
2021            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2022                .expect("Unable to write file");
2023        writer.write(&batch).unwrap();
2024        writer.close().unwrap();
2025
2026        let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2027
2028        let column = reader.metadata().row_group(0).columns();
2029
2030        assert_eq!(column.len(), 1);
2031
2032        // We should write one row before falling back to PLAIN encoding so there should still be a
2033        // dictionary page.
2034        assert!(
2035            column[0].dictionary_page_offset().is_some(),
2036            "Expected a dictionary page"
2037        );
2038
2039        let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
2040
2041        let page_locations = offset_indexes[0].page_locations.clone();
2042
2043        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2044        // so we expect one dictionary encoded page and then a page per row thereafter.
2045        assert_eq!(
2046            page_locations.len(),
2047            10,
2048            "Expected 9 pages but got {page_locations:#?}"
2049        );
2050    }
2051
2052    #[test]
2053    fn arrow_writer_float_nans() {
2054        let f16_field = Field::new("a", DataType::Float16, false);
2055        let f32_field = Field::new("b", DataType::Float32, false);
2056        let f64_field = Field::new("c", DataType::Float64, false);
2057        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2058
2059        let f16_values = (0..MEDIUM_SIZE)
2060            .map(|i| {
2061                Some(if i % 2 == 0 {
2062                    f16::NAN
2063                } else {
2064                    f16::from_f32(i as f32)
2065                })
2066            })
2067            .collect::<Float16Array>();
2068
2069        let f32_values = (0..MEDIUM_SIZE)
2070            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2071            .collect::<Float32Array>();
2072
2073        let f64_values = (0..MEDIUM_SIZE)
2074            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2075            .collect::<Float64Array>();
2076
2077        let batch = RecordBatch::try_new(
2078            Arc::new(schema),
2079            vec![
2080                Arc::new(f16_values),
2081                Arc::new(f32_values),
2082                Arc::new(f64_values),
2083            ],
2084        )
2085        .unwrap();
2086
2087        roundtrip(batch, None);
2088    }
2089
2090    const SMALL_SIZE: usize = 7;
2091    const MEDIUM_SIZE: usize = 63;
2092
2093    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2094        let mut files = vec![];
2095        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2096            let mut props = WriterProperties::builder().set_writer_version(version);
2097
2098            if let Some(size) = max_row_group_size {
2099                props = props.set_max_row_group_size(size)
2100            }
2101
2102            let props = props.build();
2103            files.push(roundtrip_opts(&expected_batch, props))
2104        }
2105        files
2106    }
2107
2108    fn roundtrip_opts_with_array_validation<F>(
2109        expected_batch: &RecordBatch,
2110        props: WriterProperties,
2111        validate: F,
2112    ) -> File
2113    where
2114        F: Fn(&ArrayData, &ArrayData),
2115    {
2116        let file = tempfile::tempfile().unwrap();
2117
2118        let mut writer = ArrowWriter::try_new(
2119            file.try_clone().unwrap(),
2120            expected_batch.schema(),
2121            Some(props),
2122        )
2123        .expect("Unable to write file");
2124        writer.write(expected_batch).unwrap();
2125        writer.close().unwrap();
2126
2127        let mut record_batch_reader =
2128            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2129
2130        let actual_batch = record_batch_reader
2131            .next()
2132            .expect("No batch found")
2133            .expect("Unable to get batch");
2134
2135        assert_eq!(expected_batch.schema(), actual_batch.schema());
2136        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2137        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2138        for i in 0..expected_batch.num_columns() {
2139            let expected_data = expected_batch.column(i).to_data();
2140            let actual_data = actual_batch.column(i).to_data();
2141            validate(&expected_data, &actual_data);
2142        }
2143
2144        file
2145    }
2146
2147    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2148        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2149            a.validate_full().expect("valid expected data");
2150            b.validate_full().expect("valid actual data");
2151            assert_eq!(a, b)
2152        })
2153    }
2154
2155    struct RoundTripOptions {
2156        values: ArrayRef,
2157        schema: SchemaRef,
2158        bloom_filter: bool,
2159        bloom_filter_position: BloomFilterPosition,
2160    }
2161
2162    impl RoundTripOptions {
2163        fn new(values: ArrayRef, nullable: bool) -> Self {
2164            let data_type = values.data_type().clone();
2165            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2166            Self {
2167                values,
2168                schema: Arc::new(schema),
2169                bloom_filter: false,
2170                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2171            }
2172        }
2173    }
2174
2175    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2176        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2177    }
2178
2179    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2180        let mut options = RoundTripOptions::new(values, false);
2181        options.schema = schema;
2182        one_column_roundtrip_with_options(options)
2183    }
2184
2185    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2186        let RoundTripOptions {
2187            values,
2188            schema,
2189            bloom_filter,
2190            bloom_filter_position,
2191        } = options;
2192
2193        let encodings = match values.data_type() {
2194            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2195                vec![
2196                    Encoding::PLAIN,
2197                    Encoding::DELTA_BYTE_ARRAY,
2198                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2199                ]
2200            }
2201            DataType::Int64
2202            | DataType::Int32
2203            | DataType::Int16
2204            | DataType::Int8
2205            | DataType::UInt64
2206            | DataType::UInt32
2207            | DataType::UInt16
2208            | DataType::UInt8 => vec![
2209                Encoding::PLAIN,
2210                Encoding::DELTA_BINARY_PACKED,
2211                Encoding::BYTE_STREAM_SPLIT,
2212            ],
2213            DataType::Float32 | DataType::Float64 => {
2214                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2215            }
2216            _ => vec![Encoding::PLAIN],
2217        };
2218
2219        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2220
2221        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2222
2223        let mut files = vec![];
2224        for dictionary_size in [0, 1, 1024] {
2225            for encoding in &encodings {
2226                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2227                    for row_group_size in row_group_sizes {
2228                        let props = WriterProperties::builder()
2229                            .set_writer_version(version)
2230                            .set_max_row_group_size(row_group_size)
2231                            .set_dictionary_enabled(dictionary_size != 0)
2232                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2233                            .set_encoding(*encoding)
2234                            .set_bloom_filter_enabled(bloom_filter)
2235                            .set_bloom_filter_position(bloom_filter_position)
2236                            .build();
2237
2238                        files.push(roundtrip_opts(&expected_batch, props))
2239                    }
2240                }
2241            }
2242        }
2243        files
2244    }
2245
2246    fn values_required<A, I>(iter: I) -> Vec<File>
2247    where
2248        A: From<Vec<I::Item>> + Array + 'static,
2249        I: IntoIterator,
2250    {
2251        let raw_values: Vec<_> = iter.into_iter().collect();
2252        let values = Arc::new(A::from(raw_values));
2253        one_column_roundtrip(values, false)
2254    }
2255
2256    fn values_optional<A, I>(iter: I) -> Vec<File>
2257    where
2258        A: From<Vec<Option<I::Item>>> + Array + 'static,
2259        I: IntoIterator,
2260    {
2261        let optional_raw_values: Vec<_> = iter
2262            .into_iter()
2263            .enumerate()
2264            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2265            .collect();
2266        let optional_values = Arc::new(A::from(optional_raw_values));
2267        one_column_roundtrip(optional_values, true)
2268    }
2269
2270    fn required_and_optional<A, I>(iter: I)
2271    where
2272        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2273        I: IntoIterator + Clone,
2274    {
2275        values_required::<A, I>(iter.clone());
2276        values_optional::<A, I>(iter);
2277    }
2278
2279    fn check_bloom_filter<T: AsBytes>(
2280        files: Vec<File>,
2281        file_column: String,
2282        positive_values: Vec<T>,
2283        negative_values: Vec<T>,
2284    ) {
2285        files.into_iter().take(1).for_each(|file| {
2286            let file_reader = SerializedFileReader::new_with_options(
2287                file,
2288                ReadOptionsBuilder::new()
2289                    .with_reader_properties(
2290                        ReaderProperties::builder()
2291                            .set_read_bloom_filter(true)
2292                            .build(),
2293                    )
2294                    .build(),
2295            )
2296            .expect("Unable to open file as Parquet");
2297            let metadata = file_reader.metadata();
2298
2299            // Gets bloom filters from all row groups.
2300            let mut bloom_filters: Vec<_> = vec![];
2301            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2302                if let Some((column_index, _)) = row_group
2303                    .columns()
2304                    .iter()
2305                    .enumerate()
2306                    .find(|(_, column)| column.column_path().string() == file_column)
2307                {
2308                    let row_group_reader = file_reader
2309                        .get_row_group(ri)
2310                        .expect("Unable to read row group");
2311                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2312                        bloom_filters.push(sbbf.clone());
2313                    } else {
2314                        panic!("No bloom filter for column named {file_column} found");
2315                    }
2316                } else {
2317                    panic!("No column named {file_column} found");
2318                }
2319            }
2320
2321            positive_values.iter().for_each(|value| {
2322                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2323                assert!(
2324                    found.is_some(),
2325                    "{}",
2326                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2327                );
2328            });
2329
2330            negative_values.iter().for_each(|value| {
2331                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2332                assert!(
2333                    found.is_none(),
2334                    "{}",
2335                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2336                );
2337            });
2338        });
2339    }
2340
2341    #[test]
2342    fn all_null_primitive_single_column() {
2343        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2344        one_column_roundtrip(values, true);
2345    }
2346    #[test]
2347    fn null_single_column() {
2348        let values = Arc::new(NullArray::new(SMALL_SIZE));
2349        one_column_roundtrip(values, true);
2350        // null arrays are always nullable, a test with non-nullable nulls fails
2351    }
2352
2353    #[test]
2354    fn bool_single_column() {
2355        required_and_optional::<BooleanArray, _>(
2356            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2357        );
2358    }
2359
2360    #[test]
2361    fn bool_large_single_column() {
2362        let values = Arc::new(
2363            [None, Some(true), Some(false)]
2364                .iter()
2365                .cycle()
2366                .copied()
2367                .take(200_000)
2368                .collect::<BooleanArray>(),
2369        );
2370        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2371        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2372        let file = tempfile::tempfile().unwrap();
2373
2374        let mut writer =
2375            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2376                .expect("Unable to write file");
2377        writer.write(&expected_batch).unwrap();
2378        writer.close().unwrap();
2379    }
2380
2381    #[test]
2382    fn check_page_offset_index_with_nan() {
2383        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2384        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2385        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2386
2387        let mut out = Vec::with_capacity(1024);
2388        let mut writer =
2389            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2390        writer.write(&batch).unwrap();
2391        let file_meta_data = writer.close().unwrap();
2392        for row_group in file_meta_data.row_groups {
2393            for column in row_group.columns {
2394                assert!(column.offset_index_offset.is_some());
2395                assert!(column.offset_index_length.is_some());
2396                assert!(column.column_index_offset.is_none());
2397                assert!(column.column_index_length.is_none());
2398            }
2399        }
2400    }
2401
2402    #[test]
2403    fn i8_single_column() {
2404        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2405    }
2406
2407    #[test]
2408    fn i16_single_column() {
2409        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2410    }
2411
2412    #[test]
2413    fn i32_single_column() {
2414        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2415    }
2416
2417    #[test]
2418    fn i64_single_column() {
2419        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2420    }
2421
2422    #[test]
2423    fn u8_single_column() {
2424        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2425    }
2426
2427    #[test]
2428    fn u16_single_column() {
2429        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2430    }
2431
2432    #[test]
2433    fn u32_single_column() {
2434        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2435    }
2436
2437    #[test]
2438    fn u64_single_column() {
2439        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2440    }
2441
2442    #[test]
2443    fn f32_single_column() {
2444        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2445    }
2446
2447    #[test]
2448    fn f64_single_column() {
2449        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2450    }
2451
2452    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2453    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2454    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2455
2456    #[test]
2457    fn timestamp_second_single_column() {
2458        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2459        let values = Arc::new(TimestampSecondArray::from(raw_values));
2460
2461        one_column_roundtrip(values, false);
2462    }
2463
2464    #[test]
2465    fn timestamp_millisecond_single_column() {
2466        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2467        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2468
2469        one_column_roundtrip(values, false);
2470    }
2471
2472    #[test]
2473    fn timestamp_microsecond_single_column() {
2474        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2475        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2476
2477        one_column_roundtrip(values, false);
2478    }
2479
2480    #[test]
2481    fn timestamp_nanosecond_single_column() {
2482        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2483        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2484
2485        one_column_roundtrip(values, false);
2486    }
2487
2488    #[test]
2489    fn date32_single_column() {
2490        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2491    }
2492
2493    #[test]
2494    fn date64_single_column() {
2495        // Date64 must be a multiple of 86400000, see ARROW-10925
2496        required_and_optional::<Date64Array, _>(
2497            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2498        );
2499    }
2500
2501    #[test]
2502    fn time32_second_single_column() {
2503        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2504    }
2505
2506    #[test]
2507    fn time32_millisecond_single_column() {
2508        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2509    }
2510
2511    #[test]
2512    fn time64_microsecond_single_column() {
2513        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2514    }
2515
2516    #[test]
2517    fn time64_nanosecond_single_column() {
2518        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2519    }
2520
2521    #[test]
2522    fn duration_second_single_column() {
2523        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2524    }
2525
2526    #[test]
2527    fn duration_millisecond_single_column() {
2528        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2529    }
2530
2531    #[test]
2532    fn duration_microsecond_single_column() {
2533        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2534    }
2535
2536    #[test]
2537    fn duration_nanosecond_single_column() {
2538        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2539    }
2540
2541    #[test]
2542    fn interval_year_month_single_column() {
2543        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2544    }
2545
2546    #[test]
2547    fn interval_day_time_single_column() {
2548        required_and_optional::<IntervalDayTimeArray, _>(vec![
2549            IntervalDayTime::new(0, 1),
2550            IntervalDayTime::new(0, 3),
2551            IntervalDayTime::new(3, -2),
2552            IntervalDayTime::new(-200, 4),
2553        ]);
2554    }
2555
2556    #[test]
2557    #[should_panic(
2558        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2559    )]
2560    fn interval_month_day_nano_single_column() {
2561        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2562            IntervalMonthDayNano::new(0, 1, 5),
2563            IntervalMonthDayNano::new(0, 3, 2),
2564            IntervalMonthDayNano::new(3, -2, -5),
2565            IntervalMonthDayNano::new(-200, 4, -1),
2566        ]);
2567    }
2568
2569    #[test]
2570    fn binary_single_column() {
2571        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2572        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2573        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2574
2575        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2576        values_required::<BinaryArray, _>(many_vecs_iter);
2577    }
2578
2579    #[test]
2580    fn binary_view_single_column() {
2581        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2582        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2583        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2584
2585        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2586        values_required::<BinaryViewArray, _>(many_vecs_iter);
2587    }
2588
2589    #[test]
2590    fn i32_column_bloom_filter_at_end() {
2591        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2592        let mut options = RoundTripOptions::new(array, false);
2593        options.bloom_filter = true;
2594        options.bloom_filter_position = BloomFilterPosition::End;
2595
2596        let files = one_column_roundtrip_with_options(options);
2597        check_bloom_filter(
2598            files,
2599            "col".to_string(),
2600            (0..SMALL_SIZE as i32).collect(),
2601            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2602        );
2603    }
2604
2605    #[test]
2606    fn i32_column_bloom_filter() {
2607        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2608        let mut options = RoundTripOptions::new(array, false);
2609        options.bloom_filter = true;
2610
2611        let files = one_column_roundtrip_with_options(options);
2612        check_bloom_filter(
2613            files,
2614            "col".to_string(),
2615            (0..SMALL_SIZE as i32).collect(),
2616            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2617        );
2618    }
2619
2620    #[test]
2621    fn binary_column_bloom_filter() {
2622        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2623        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2624        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2625
2626        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2627        let mut options = RoundTripOptions::new(array, false);
2628        options.bloom_filter = true;
2629
2630        let files = one_column_roundtrip_with_options(options);
2631        check_bloom_filter(
2632            files,
2633            "col".to_string(),
2634            many_vecs,
2635            vec![vec![(SMALL_SIZE + 1) as u8]],
2636        );
2637    }
2638
2639    #[test]
2640    fn empty_string_null_column_bloom_filter() {
2641        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2642        let raw_strs = raw_values.iter().map(|s| s.as_str());
2643
2644        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2645        let mut options = RoundTripOptions::new(array, false);
2646        options.bloom_filter = true;
2647
2648        let files = one_column_roundtrip_with_options(options);
2649
2650        let optional_raw_values: Vec<_> = raw_values
2651            .iter()
2652            .enumerate()
2653            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2654            .collect();
2655        // For null slots, empty string should not be in bloom filter.
2656        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2657    }
2658
2659    #[test]
2660    fn large_binary_single_column() {
2661        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2662        let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2663        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2664
2665        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2666        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2667    }
2668
2669    #[test]
2670    fn fixed_size_binary_single_column() {
2671        let mut builder = FixedSizeBinaryBuilder::new(4);
2672        builder.append_value(b"0123").unwrap();
2673        builder.append_null();
2674        builder.append_value(b"8910").unwrap();
2675        builder.append_value(b"1112").unwrap();
2676        let array = Arc::new(builder.finish());
2677
2678        one_column_roundtrip(array, true);
2679    }
2680
2681    #[test]
2682    fn string_single_column() {
2683        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2684        let raw_strs = raw_values.iter().map(|s| s.as_str());
2685
2686        required_and_optional::<StringArray, _>(raw_strs);
2687    }
2688
2689    #[test]
2690    fn large_string_single_column() {
2691        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2692        let raw_strs = raw_values.iter().map(|s| s.as_str());
2693
2694        required_and_optional::<LargeStringArray, _>(raw_strs);
2695    }
2696
2697    #[test]
2698    fn string_view_single_column() {
2699        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2700        let raw_strs = raw_values.iter().map(|s| s.as_str());
2701
2702        required_and_optional::<StringViewArray, _>(raw_strs);
2703    }
2704
2705    #[test]
2706    fn null_list_single_column() {
2707        let null_field = Field::new_list_field(DataType::Null, true);
2708        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2709
2710        let schema = Schema::new(vec![list_field]);
2711
2712        // Build [[], null, [null, null]]
2713        let a_values = NullArray::new(2);
2714        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2715        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2716            DataType::Null,
2717            true,
2718        ))))
2719        .len(3)
2720        .add_buffer(a_value_offsets)
2721        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2722        .add_child_data(a_values.into_data())
2723        .build()
2724        .unwrap();
2725
2726        let a = ListArray::from(a_list_data);
2727
2728        assert!(a.is_valid(0));
2729        assert!(!a.is_valid(1));
2730        assert!(a.is_valid(2));
2731
2732        assert_eq!(a.value(0).len(), 0);
2733        assert_eq!(a.value(2).len(), 2);
2734        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2735
2736        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2737        roundtrip(batch, None);
2738    }
2739
2740    #[test]
2741    fn list_single_column() {
2742        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2743        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2744        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2745            DataType::Int32,
2746            false,
2747        ))))
2748        .len(5)
2749        .add_buffer(a_value_offsets)
2750        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2751        .add_child_data(a_values.into_data())
2752        .build()
2753        .unwrap();
2754
2755        assert_eq!(a_list_data.null_count(), 1);
2756
2757        let a = ListArray::from(a_list_data);
2758        let values = Arc::new(a);
2759
2760        one_column_roundtrip(values, true);
2761    }
2762
2763    #[test]
2764    fn large_list_single_column() {
2765        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2766        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2767        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2768            "large_item",
2769            DataType::Int32,
2770            true,
2771        ))))
2772        .len(5)
2773        .add_buffer(a_value_offsets)
2774        .add_child_data(a_values.into_data())
2775        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2776        .build()
2777        .unwrap();
2778
2779        // I think this setup is incorrect because this should pass
2780        assert_eq!(a_list_data.null_count(), 1);
2781
2782        let a = LargeListArray::from(a_list_data);
2783        let values = Arc::new(a);
2784
2785        one_column_roundtrip(values, true);
2786    }
2787
2788    #[test]
2789    fn list_nested_nulls() {
2790        use arrow::datatypes::Int32Type;
2791        let data = vec![
2792            Some(vec![Some(1)]),
2793            Some(vec![Some(2), Some(3)]),
2794            None,
2795            Some(vec![Some(4), Some(5), None]),
2796            Some(vec![None]),
2797            Some(vec![Some(6), Some(7)]),
2798        ];
2799
2800        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2801        one_column_roundtrip(Arc::new(list), true);
2802
2803        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2804        one_column_roundtrip(Arc::new(list), true);
2805    }
2806
2807    #[test]
2808    fn struct_single_column() {
2809        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2810        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2811        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2812
2813        let values = Arc::new(s);
2814        one_column_roundtrip(values, false);
2815    }
2816
2817    #[test]
2818    fn list_and_map_coerced_names() {
2819        // Create map and list with non-Parquet naming
2820        let list_field =
2821            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2822        let map_field = Field::new_map(
2823            "my_map",
2824            "entries",
2825            Field::new("keys", DataType::Int32, false),
2826            Field::new("values", DataType::Int32, true),
2827            false,
2828            true,
2829        );
2830
2831        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2832        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2833
2834        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2835
2836        // Write data to Parquet but coerce names to match spec
2837        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2838        let file = tempfile::tempfile().unwrap();
2839        let mut writer =
2840            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2841
2842        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2843        writer.write(&batch).unwrap();
2844        let file_metadata = writer.close().unwrap();
2845
2846        // Coerced name of "item" should be "element"
2847        assert_eq!(file_metadata.schema[3].name, "element");
2848        // Coerced name of "entries" should be "key_value"
2849        assert_eq!(file_metadata.schema[5].name, "key_value");
2850        // Coerced name of "keys" should be "key"
2851        assert_eq!(file_metadata.schema[6].name, "key");
2852        // Coerced name of "values" should be "value"
2853        assert_eq!(file_metadata.schema[7].name, "value");
2854
2855        // Double check schema after reading from the file
2856        let reader = SerializedFileReader::new(file).unwrap();
2857        let file_schema = reader.metadata().file_metadata().schema();
2858        let fields = file_schema.get_fields();
2859        let list_field = &fields[0].get_fields()[0];
2860        assert_eq!(list_field.get_fields()[0].name(), "element");
2861        let map_field = &fields[1].get_fields()[0];
2862        assert_eq!(map_field.name(), "key_value");
2863        assert_eq!(map_field.get_fields()[0].name(), "key");
2864        assert_eq!(map_field.get_fields()[1].name(), "value");
2865    }
2866
2867    #[test]
2868    fn fallback_flush_data_page() {
2869        //tests if the Fallback::flush_data_page clears all buffers correctly
2870        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2871        let values = Arc::new(StringArray::from(raw_values));
2872        let encodings = vec![
2873            Encoding::DELTA_BYTE_ARRAY,
2874            Encoding::DELTA_LENGTH_BYTE_ARRAY,
2875        ];
2876        let data_type = values.data_type().clone();
2877        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2878        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2879
2880        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2881        let data_page_size_limit: usize = 32;
2882        let write_batch_size: usize = 16;
2883
2884        for encoding in &encodings {
2885            for row_group_size in row_group_sizes {
2886                let props = WriterProperties::builder()
2887                    .set_writer_version(WriterVersion::PARQUET_2_0)
2888                    .set_max_row_group_size(row_group_size)
2889                    .set_dictionary_enabled(false)
2890                    .set_encoding(*encoding)
2891                    .set_data_page_size_limit(data_page_size_limit)
2892                    .set_write_batch_size(write_batch_size)
2893                    .build();
2894
2895                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2896                    let string_array_a = StringArray::from(a.clone());
2897                    let string_array_b = StringArray::from(b.clone());
2898                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2899                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2900                    assert_eq!(
2901                        vec_a, vec_b,
2902                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2903                    );
2904                });
2905            }
2906        }
2907    }
2908
2909    #[test]
2910    fn arrow_writer_string_dictionary() {
2911        // define schema
2912        #[allow(deprecated)]
2913        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2914            "dictionary",
2915            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2916            true,
2917            42,
2918            true,
2919        )]));
2920
2921        // create some data
2922        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2923            .iter()
2924            .copied()
2925            .collect();
2926
2927        // build a record batch
2928        one_column_roundtrip_with_schema(Arc::new(d), schema);
2929    }
2930
2931    #[test]
2932    fn arrow_writer_primitive_dictionary() {
2933        // define schema
2934        #[allow(deprecated)]
2935        let schema = Arc::new(Schema::new(vec![Field::new_dict(
2936            "dictionary",
2937            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2938            true,
2939            42,
2940            true,
2941        )]));
2942
2943        // create some data
2944        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2945        builder.append(12345678).unwrap();
2946        builder.append_null();
2947        builder.append(22345678).unwrap();
2948        builder.append(12345678).unwrap();
2949        let d = builder.finish();
2950
2951        one_column_roundtrip_with_schema(Arc::new(d), schema);
2952    }
2953
2954    #[test]
2955    fn arrow_writer_decimal128_dictionary() {
2956        let integers = vec![12345, 56789, 34567];
2957
2958        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2959
2960        let values = Decimal128Array::from(integers.clone())
2961            .with_precision_and_scale(5, 2)
2962            .unwrap();
2963
2964        let array = DictionaryArray::new(keys, Arc::new(values));
2965        one_column_roundtrip(Arc::new(array.clone()), true);
2966
2967        let values = Decimal128Array::from(integers)
2968            .with_precision_and_scale(12, 2)
2969            .unwrap();
2970
2971        let array = array.with_values(Arc::new(values));
2972        one_column_roundtrip(Arc::new(array), true);
2973    }
2974
2975    #[test]
2976    fn arrow_writer_decimal256_dictionary() {
2977        let integers = vec![
2978            i256::from_i128(12345),
2979            i256::from_i128(56789),
2980            i256::from_i128(34567),
2981        ];
2982
2983        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2984
2985        let values = Decimal256Array::from(integers.clone())
2986            .with_precision_and_scale(5, 2)
2987            .unwrap();
2988
2989        let array = DictionaryArray::new(keys, Arc::new(values));
2990        one_column_roundtrip(Arc::new(array.clone()), true);
2991
2992        let values = Decimal256Array::from(integers)
2993            .with_precision_and_scale(12, 2)
2994            .unwrap();
2995
2996        let array = array.with_values(Arc::new(values));
2997        one_column_roundtrip(Arc::new(array), true);
2998    }
2999
3000    #[test]
3001    fn arrow_writer_string_dictionary_unsigned_index() {
3002        // define schema
3003        #[allow(deprecated)]
3004        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3005            "dictionary",
3006            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3007            true,
3008            42,
3009            true,
3010        )]));
3011
3012        // create some data
3013        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3014            .iter()
3015            .copied()
3016            .collect();
3017
3018        one_column_roundtrip_with_schema(Arc::new(d), schema);
3019    }
3020
3021    #[test]
3022    fn u32_min_max() {
3023        // check values roundtrip through parquet
3024        let src = [
3025            u32::MIN,
3026            u32::MIN + 1,
3027            (i32::MAX as u32) - 1,
3028            i32::MAX as u32,
3029            (i32::MAX as u32) + 1,
3030            u32::MAX - 1,
3031            u32::MAX,
3032        ];
3033        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3034        let files = one_column_roundtrip(values, false);
3035
3036        for file in files {
3037            // check statistics are valid
3038            let reader = SerializedFileReader::new(file).unwrap();
3039            let metadata = reader.metadata();
3040
3041            let mut row_offset = 0;
3042            for row_group in metadata.row_groups() {
3043                assert_eq!(row_group.num_columns(), 1);
3044                let column = row_group.column(0);
3045
3046                let num_values = column.num_values() as usize;
3047                let src_slice = &src[row_offset..row_offset + num_values];
3048                row_offset += column.num_values() as usize;
3049
3050                let stats = column.statistics().unwrap();
3051                if let Statistics::Int32(stats) = stats {
3052                    assert_eq!(
3053                        *stats.min_opt().unwrap() as u32,
3054                        *src_slice.iter().min().unwrap()
3055                    );
3056                    assert_eq!(
3057                        *stats.max_opt().unwrap() as u32,
3058                        *src_slice.iter().max().unwrap()
3059                    );
3060                } else {
3061                    panic!("Statistics::Int32 missing")
3062                }
3063            }
3064        }
3065    }
3066
3067    #[test]
3068    fn u64_min_max() {
3069        // check values roundtrip through parquet
3070        let src = [
3071            u64::MIN,
3072            u64::MIN + 1,
3073            (i64::MAX as u64) - 1,
3074            i64::MAX as u64,
3075            (i64::MAX as u64) + 1,
3076            u64::MAX - 1,
3077            u64::MAX,
3078        ];
3079        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3080        let files = one_column_roundtrip(values, false);
3081
3082        for file in files {
3083            // check statistics are valid
3084            let reader = SerializedFileReader::new(file).unwrap();
3085            let metadata = reader.metadata();
3086
3087            let mut row_offset = 0;
3088            for row_group in metadata.row_groups() {
3089                assert_eq!(row_group.num_columns(), 1);
3090                let column = row_group.column(0);
3091
3092                let num_values = column.num_values() as usize;
3093                let src_slice = &src[row_offset..row_offset + num_values];
3094                row_offset += column.num_values() as usize;
3095
3096                let stats = column.statistics().unwrap();
3097                if let Statistics::Int64(stats) = stats {
3098                    assert_eq!(
3099                        *stats.min_opt().unwrap() as u64,
3100                        *src_slice.iter().min().unwrap()
3101                    );
3102                    assert_eq!(
3103                        *stats.max_opt().unwrap() as u64,
3104                        *src_slice.iter().max().unwrap()
3105                    );
3106                } else {
3107                    panic!("Statistics::Int64 missing")
3108                }
3109            }
3110        }
3111    }
3112
3113    #[test]
3114    fn statistics_null_counts_only_nulls() {
3115        // check that null-count statistics for "only NULL"-columns are correct
3116        let values = Arc::new(UInt64Array::from(vec![None, None]));
3117        let files = one_column_roundtrip(values, true);
3118
3119        for file in files {
3120            // check statistics are valid
3121            let reader = SerializedFileReader::new(file).unwrap();
3122            let metadata = reader.metadata();
3123            assert_eq!(metadata.num_row_groups(), 1);
3124            let row_group = metadata.row_group(0);
3125            assert_eq!(row_group.num_columns(), 1);
3126            let column = row_group.column(0);
3127            let stats = column.statistics().unwrap();
3128            assert_eq!(stats.null_count_opt(), Some(2));
3129        }
3130    }
3131
3132    #[test]
3133    fn test_list_of_struct_roundtrip() {
3134        // define schema
3135        let int_field = Field::new("a", DataType::Int32, true);
3136        let int_field2 = Field::new("b", DataType::Int32, true);
3137
3138        let int_builder = Int32Builder::with_capacity(10);
3139        let int_builder2 = Int32Builder::with_capacity(10);
3140
3141        let struct_builder = StructBuilder::new(
3142            vec![int_field, int_field2],
3143            vec![Box::new(int_builder), Box::new(int_builder2)],
3144        );
3145        let mut list_builder = ListBuilder::new(struct_builder);
3146
3147        // Construct the following array
3148        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3149
3150        // [{a: 1, b: 2}]
3151        let values = list_builder.values();
3152        values
3153            .field_builder::<Int32Builder>(0)
3154            .unwrap()
3155            .append_value(1);
3156        values
3157            .field_builder::<Int32Builder>(1)
3158            .unwrap()
3159            .append_value(2);
3160        values.append(true);
3161        list_builder.append(true);
3162
3163        // []
3164        list_builder.append(true);
3165
3166        // null
3167        list_builder.append(false);
3168
3169        // [null, null]
3170        let values = list_builder.values();
3171        values
3172            .field_builder::<Int32Builder>(0)
3173            .unwrap()
3174            .append_null();
3175        values
3176            .field_builder::<Int32Builder>(1)
3177            .unwrap()
3178            .append_null();
3179        values.append(false);
3180        values
3181            .field_builder::<Int32Builder>(0)
3182            .unwrap()
3183            .append_null();
3184        values
3185            .field_builder::<Int32Builder>(1)
3186            .unwrap()
3187            .append_null();
3188        values.append(false);
3189        list_builder.append(true);
3190
3191        // [{a: null, b: 3}]
3192        let values = list_builder.values();
3193        values
3194            .field_builder::<Int32Builder>(0)
3195            .unwrap()
3196            .append_null();
3197        values
3198            .field_builder::<Int32Builder>(1)
3199            .unwrap()
3200            .append_value(3);
3201        values.append(true);
3202        list_builder.append(true);
3203
3204        // [{a: 2, b: null}]
3205        let values = list_builder.values();
3206        values
3207            .field_builder::<Int32Builder>(0)
3208            .unwrap()
3209            .append_value(2);
3210        values
3211            .field_builder::<Int32Builder>(1)
3212            .unwrap()
3213            .append_null();
3214        values.append(true);
3215        list_builder.append(true);
3216
3217        let array = Arc::new(list_builder.finish());
3218
3219        one_column_roundtrip(array, true);
3220    }
3221
3222    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3223        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3224    }
3225
3226    #[test]
3227    fn test_aggregates_records() {
3228        let arrays = [
3229            Int32Array::from((0..100).collect::<Vec<_>>()),
3230            Int32Array::from((0..50).collect::<Vec<_>>()),
3231            Int32Array::from((200..500).collect::<Vec<_>>()),
3232        ];
3233
3234        let schema = Arc::new(Schema::new(vec![Field::new(
3235            "int",
3236            ArrowDataType::Int32,
3237            false,
3238        )]));
3239
3240        let file = tempfile::tempfile().unwrap();
3241
3242        let props = WriterProperties::builder()
3243            .set_max_row_group_size(200)
3244            .build();
3245
3246        let mut writer =
3247            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3248
3249        for array in arrays {
3250            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3251            writer.write(&batch).unwrap();
3252        }
3253
3254        writer.close().unwrap();
3255
3256        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3257        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3258
3259        let batches = builder
3260            .with_batch_size(100)
3261            .build()
3262            .unwrap()
3263            .collect::<ArrowResult<Vec<_>>>()
3264            .unwrap();
3265
3266        assert_eq!(batches.len(), 5);
3267        assert!(batches.iter().all(|x| x.num_columns() == 1));
3268
3269        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3270
3271        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3272
3273        let values: Vec<_> = batches
3274            .iter()
3275            .flat_map(|x| {
3276                x.column(0)
3277                    .as_any()
3278                    .downcast_ref::<Int32Array>()
3279                    .unwrap()
3280                    .values()
3281                    .iter()
3282                    .cloned()
3283            })
3284            .collect();
3285
3286        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3287        assert_eq!(&values, &expected_values)
3288    }
3289
3290    #[test]
3291    fn complex_aggregate() {
3292        // Tests aggregating nested data
3293        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3294        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3295        let struct_a = Arc::new(Field::new(
3296            "struct_a",
3297            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3298            true,
3299        ));
3300
3301        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3302        let struct_b = Arc::new(Field::new(
3303            "struct_b",
3304            DataType::Struct(vec![list_a.clone()].into()),
3305            false,
3306        ));
3307
3308        let schema = Arc::new(Schema::new(vec![struct_b]));
3309
3310        // create nested data
3311        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3312        let field_b_array =
3313            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3314
3315        let struct_a_array = StructArray::from(vec![
3316            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3317            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3318        ]);
3319
3320        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3321            .len(5)
3322            .add_buffer(Buffer::from_iter(vec![
3323                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3324            ]))
3325            .null_bit_buffer(Some(Buffer::from_iter(vec![
3326                true, false, true, false, true,
3327            ])))
3328            .child_data(vec![struct_a_array.into_data()])
3329            .build()
3330            .unwrap();
3331
3332        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3333        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3334
3335        let batch1 =
3336            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3337                .unwrap();
3338
3339        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3340        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3341
3342        let struct_a_array = StructArray::from(vec![
3343            (field_a, Arc::new(field_a_array) as ArrayRef),
3344            (field_b, Arc::new(field_b_array) as ArrayRef),
3345        ]);
3346
3347        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3348            .len(2)
3349            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3350            .child_data(vec![struct_a_array.into_data()])
3351            .build()
3352            .unwrap();
3353
3354        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3355        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3356
3357        let batch2 =
3358            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3359                .unwrap();
3360
3361        let batches = &[batch1, batch2];
3362
3363        // Verify data is as expected
3364
3365        let expected = r#"
3366            +-------------------------------------------------------------------------------------------------------+
3367            | struct_b                                                                                              |
3368            +-------------------------------------------------------------------------------------------------------+
3369            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3370            | {list: }                                                                                              |
3371            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3372            | {list: }                                                                                              |
3373            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3374            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3375            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3376            +-------------------------------------------------------------------------------------------------------+
3377        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3378
3379        let actual = pretty_format_batches(batches).unwrap().to_string();
3380        assert_eq!(actual, expected);
3381
3382        // Write data
3383        let file = tempfile::tempfile().unwrap();
3384        let props = WriterProperties::builder()
3385            .set_max_row_group_size(6)
3386            .build();
3387
3388        let mut writer =
3389            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3390
3391        for batch in batches {
3392            writer.write(batch).unwrap();
3393        }
3394        writer.close().unwrap();
3395
3396        // Read Data
3397        // Should have written entire first batch and first row of second to the first row group
3398        // leaving a single row in the second row group
3399
3400        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3401        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3402
3403        let batches = builder
3404            .with_batch_size(2)
3405            .build()
3406            .unwrap()
3407            .collect::<ArrowResult<Vec<_>>>()
3408            .unwrap();
3409
3410        assert_eq!(batches.len(), 4);
3411        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3412        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3413
3414        let actual = pretty_format_batches(&batches).unwrap().to_string();
3415        assert_eq!(actual, expected);
3416    }
3417
3418    #[test]
3419    fn test_arrow_writer_metadata() {
3420        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3421        let file_schema = batch_schema.clone().with_metadata(
3422            vec![("foo".to_string(), "bar".to_string())]
3423                .into_iter()
3424                .collect(),
3425        );
3426
3427        let batch = RecordBatch::try_new(
3428            Arc::new(batch_schema),
3429            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3430        )
3431        .unwrap();
3432
3433        let mut buf = Vec::with_capacity(1024);
3434        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3435        writer.write(&batch).unwrap();
3436        writer.close().unwrap();
3437    }
3438
3439    #[test]
3440    fn test_arrow_writer_nullable() {
3441        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3442        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3443        let file_schema = Arc::new(file_schema);
3444
3445        let batch = RecordBatch::try_new(
3446            Arc::new(batch_schema),
3447            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3448        )
3449        .unwrap();
3450
3451        let mut buf = Vec::with_capacity(1024);
3452        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3453        writer.write(&batch).unwrap();
3454        writer.close().unwrap();
3455
3456        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3457        let back = read.next().unwrap().unwrap();
3458        assert_eq!(back.schema(), file_schema);
3459        assert_ne!(back.schema(), batch.schema());
3460        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3461    }
3462
3463    #[test]
3464    fn in_progress_accounting() {
3465        // define schema
3466        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3467
3468        // create some data
3469        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3470
3471        // build a record batch
3472        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3473
3474        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3475
3476        // starts empty
3477        assert_eq!(writer.in_progress_size(), 0);
3478        assert_eq!(writer.in_progress_rows(), 0);
3479        assert_eq!(writer.memory_size(), 0);
3480        assert_eq!(writer.bytes_written(), 4); // Initial header
3481        writer.write(&batch).unwrap();
3482
3483        // updated on write
3484        let initial_size = writer.in_progress_size();
3485        assert!(initial_size > 0);
3486        assert_eq!(writer.in_progress_rows(), 5);
3487        let initial_memory = writer.memory_size();
3488        assert!(initial_memory > 0);
3489        // memory estimate is larger than estimated encoded size
3490        assert!(
3491            initial_size <= initial_memory,
3492            "{initial_size} <= {initial_memory}"
3493        );
3494
3495        // updated on second write
3496        writer.write(&batch).unwrap();
3497        assert!(writer.in_progress_size() > initial_size);
3498        assert_eq!(writer.in_progress_rows(), 10);
3499        assert!(writer.memory_size() > initial_memory);
3500        assert!(
3501            writer.in_progress_size() <= writer.memory_size(),
3502            "in_progress_size {} <= memory_size {}",
3503            writer.in_progress_size(),
3504            writer.memory_size()
3505        );
3506
3507        // in progress tracking is cleared, but the overall data written is updated
3508        let pre_flush_bytes_written = writer.bytes_written();
3509        writer.flush().unwrap();
3510        assert_eq!(writer.in_progress_size(), 0);
3511        assert_eq!(writer.memory_size(), 0);
3512        assert!(writer.bytes_written() > pre_flush_bytes_written);
3513
3514        writer.close().unwrap();
3515    }
3516
3517    #[test]
3518    fn test_writer_all_null() {
3519        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3520        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3521        let batch = RecordBatch::try_from_iter(vec![
3522            ("a", Arc::new(a) as ArrayRef),
3523            ("b", Arc::new(b) as ArrayRef),
3524        ])
3525        .unwrap();
3526
3527        let mut buf = Vec::with_capacity(1024);
3528        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3529        writer.write(&batch).unwrap();
3530        writer.close().unwrap();
3531
3532        let bytes = Bytes::from(buf);
3533        let options = ReadOptionsBuilder::new().with_page_index().build();
3534        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3535        let index = reader.metadata().offset_index().unwrap();
3536
3537        assert_eq!(index.len(), 1);
3538        assert_eq!(index[0].len(), 2); // 2 columns
3539        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3540        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3541    }
3542
3543    #[test]
3544    fn test_disabled_statistics_with_page() {
3545        let file_schema = Schema::new(vec![
3546            Field::new("a", DataType::Utf8, true),
3547            Field::new("b", DataType::Utf8, true),
3548        ]);
3549        let file_schema = Arc::new(file_schema);
3550
3551        let batch = RecordBatch::try_new(
3552            file_schema.clone(),
3553            vec![
3554                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3555                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3556            ],
3557        )
3558        .unwrap();
3559
3560        let props = WriterProperties::builder()
3561            .set_statistics_enabled(EnabledStatistics::None)
3562            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3563            .build();
3564
3565        let mut buf = Vec::with_capacity(1024);
3566        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3567        writer.write(&batch).unwrap();
3568
3569        let metadata = writer.close().unwrap();
3570        assert_eq!(metadata.row_groups.len(), 1);
3571        let row_group = &metadata.row_groups[0];
3572        assert_eq!(row_group.columns.len(), 2);
3573        // Column "a" has both offset and column index, as requested
3574        assert!(row_group.columns[0].offset_index_offset.is_some());
3575        assert!(row_group.columns[0].column_index_offset.is_some());
3576        // Column "b" should only have offset index
3577        assert!(row_group.columns[1].offset_index_offset.is_some());
3578        assert!(row_group.columns[1].column_index_offset.is_none());
3579
3580        let options = ReadOptionsBuilder::new().with_page_index().build();
3581        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3582
3583        let row_group = reader.get_row_group(0).unwrap();
3584        let a_col = row_group.metadata().column(0);
3585        let b_col = row_group.metadata().column(1);
3586
3587        // Column chunk of column "a" should have chunk level statistics
3588        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3589            let min = byte_array_stats.min_opt().unwrap();
3590            let max = byte_array_stats.max_opt().unwrap();
3591
3592            assert_eq!(min.as_bytes(), b"a");
3593            assert_eq!(max.as_bytes(), b"d");
3594        } else {
3595            panic!("expecting Statistics::ByteArray");
3596        }
3597
3598        // The column chunk for column "b" shouldn't have statistics
3599        assert!(b_col.statistics().is_none());
3600
3601        let offset_index = reader.metadata().offset_index().unwrap();
3602        assert_eq!(offset_index.len(), 1); // 1 row group
3603        assert_eq!(offset_index[0].len(), 2); // 2 columns
3604
3605        let column_index = reader.metadata().column_index().unwrap();
3606        assert_eq!(column_index.len(), 1); // 1 row group
3607        assert_eq!(column_index[0].len(), 2); // 2 columns
3608
3609        let a_idx = &column_index[0][0];
3610        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3611        let b_idx = &column_index[0][1];
3612        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3613    }
3614
3615    #[test]
3616    fn test_disabled_statistics_with_chunk() {
3617        let file_schema = Schema::new(vec![
3618            Field::new("a", DataType::Utf8, true),
3619            Field::new("b", DataType::Utf8, true),
3620        ]);
3621        let file_schema = Arc::new(file_schema);
3622
3623        let batch = RecordBatch::try_new(
3624            file_schema.clone(),
3625            vec![
3626                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3627                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3628            ],
3629        )
3630        .unwrap();
3631
3632        let props = WriterProperties::builder()
3633            .set_statistics_enabled(EnabledStatistics::None)
3634            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3635            .build();
3636
3637        let mut buf = Vec::with_capacity(1024);
3638        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3639        writer.write(&batch).unwrap();
3640
3641        let metadata = writer.close().unwrap();
3642        assert_eq!(metadata.row_groups.len(), 1);
3643        let row_group = &metadata.row_groups[0];
3644        assert_eq!(row_group.columns.len(), 2);
3645        // Column "a" should only have offset index
3646        assert!(row_group.columns[0].offset_index_offset.is_some());
3647        assert!(row_group.columns[0].column_index_offset.is_none());
3648        // Column "b" should only have offset index
3649        assert!(row_group.columns[1].offset_index_offset.is_some());
3650        assert!(row_group.columns[1].column_index_offset.is_none());
3651
3652        let options = ReadOptionsBuilder::new().with_page_index().build();
3653        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3654
3655        let row_group = reader.get_row_group(0).unwrap();
3656        let a_col = row_group.metadata().column(0);
3657        let b_col = row_group.metadata().column(1);
3658
3659        // Column chunk of column "a" should have chunk level statistics
3660        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3661            let min = byte_array_stats.min_opt().unwrap();
3662            let max = byte_array_stats.max_opt().unwrap();
3663
3664            assert_eq!(min.as_bytes(), b"a");
3665            assert_eq!(max.as_bytes(), b"d");
3666        } else {
3667            panic!("expecting Statistics::ByteArray");
3668        }
3669
3670        // The column chunk for column "b"  shouldn't have statistics
3671        assert!(b_col.statistics().is_none());
3672
3673        let column_index = reader.metadata().column_index().unwrap();
3674        assert_eq!(column_index.len(), 1); // 1 row group
3675        assert_eq!(column_index[0].len(), 2); // 2 columns
3676
3677        let a_idx = &column_index[0][0];
3678        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3679        let b_idx = &column_index[0][1];
3680        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3681    }
3682
3683    #[test]
3684    fn test_arrow_writer_skip_metadata() {
3685        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3686        let file_schema = Arc::new(batch_schema.clone());
3687
3688        let batch = RecordBatch::try_new(
3689            Arc::new(batch_schema),
3690            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3691        )
3692        .unwrap();
3693        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3694
3695        let mut buf = Vec::with_capacity(1024);
3696        let mut writer =
3697            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3698        writer.write(&batch).unwrap();
3699        writer.close().unwrap();
3700
3701        let bytes = Bytes::from(buf);
3702        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3703        assert_eq!(file_schema, *reader_builder.schema());
3704        if let Some(key_value_metadata) = reader_builder
3705            .metadata()
3706            .file_metadata()
3707            .key_value_metadata()
3708        {
3709            assert!(!key_value_metadata
3710                .iter()
3711                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3712        }
3713    }
3714
3715    #[test]
3716    fn mismatched_schemas() {
3717        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3718        let file_schema = Arc::new(Schema::new(vec![Field::new(
3719            "temperature",
3720            DataType::Float64,
3721            false,
3722        )]));
3723
3724        let batch = RecordBatch::try_new(
3725            Arc::new(batch_schema),
3726            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3727        )
3728        .unwrap();
3729
3730        let mut buf = Vec::with_capacity(1024);
3731        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3732
3733        let err = writer.write(&batch).unwrap_err().to_string();
3734        assert_eq!(
3735            err,
3736            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3737        );
3738    }
3739
3740    #[test]
3741    // https://github.com/apache/arrow-rs/issues/6988
3742    fn test_roundtrip_empty_schema() {
3743        // create empty record batch with empty schema
3744        let empty_batch = RecordBatch::try_new_with_options(
3745            Arc::new(Schema::empty()),
3746            vec![],
3747            &RecordBatchOptions::default().with_row_count(Some(0)),
3748        )
3749        .unwrap();
3750
3751        // write to parquet
3752        let mut parquet_bytes: Vec<u8> = Vec::new();
3753        let mut writer =
3754            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3755        writer.write(&empty_batch).unwrap();
3756        writer.close().unwrap();
3757
3758        // read from parquet
3759        let bytes = Bytes::from(parquet_bytes);
3760        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3761        assert_eq!(reader.schema(), &empty_batch.schema());
3762        let batches: Vec<_> = reader
3763            .build()
3764            .unwrap()
3765            .collect::<ArrowResult<Vec<_>>>()
3766            .unwrap();
3767        assert_eq!(batches.len(), 0);
3768    }
3769}