parquet/arrow/arrow_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains writer which writes arrow data into parquet data.
19
20use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41    get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58/// Encodes [`RecordBatch`] to parquet
59///
60/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded
61/// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be
62/// flushed on close, leading the final row group in the output file to potentially
63/// contain fewer than `max_row_group_size` rows
64///
65/// # Example: Writing `RecordBatch`es
66/// ```
67/// # use std::sync::Arc;
68/// # use bytes::Bytes;
69/// # use arrow_array::{ArrayRef, Int64Array};
70/// # use arrow_array::RecordBatch;
71/// # use parquet::arrow::arrow_writer::ArrowWriter;
72/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
73/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
74/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
75///
76/// let mut buffer = Vec::new();
77/// let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
78/// writer.write(&to_write).unwrap();
79/// writer.close().unwrap();
80///
81/// let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
82/// let read = reader.next().unwrap().unwrap();
83///
84/// assert_eq!(to_write, read);
85/// ```
86///
87/// # Memory Usage and Limiting
88///
89/// The nature of Parquet requires buffering of an entire row group before it can
90/// be flushed to the underlying writer. Data is mostly buffered in its encoded
91/// form, reducing memory usage. However, some data such as dictionary keys,
92/// large strings or very nested data may still result in non-trivial memory
93/// usage.
94///
95/// See Also:
96/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
97/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
98///
99/// Call [`Self::flush`] to trigger an early flush of a row group based on a
100/// memory threshold and/or global memory pressure. However,  smaller row groups
101/// result in higher metadata overheads, and thus may worsen compression ratios
102/// and query performance.
103///
104/// ```no_run
105/// # use std::io::Write;
106/// # use arrow_array::RecordBatch;
107/// # use parquet::arrow::ArrowWriter;
108/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
109/// # let batch: RecordBatch = todo!();
110/// writer.write(&batch).unwrap();
111/// // Trigger an early flush if anticipated size exceeds 1_000_000
112/// if writer.in_progress_size() > 1_000_000 {
113///     writer.flush().unwrap();
114/// }
115/// ```
116///
117/// ## Type Support
118///
119/// The writer supports writing all Arrow [`DataType`]s that have a direct mapping to
120/// Parquet types including  [`StructArray`] and [`ListArray`].
121///
122/// The following are not supported:
123///
124/// * [`IntervalMonthDayNanoArray`]: Parquet does not [support nanosecond intervals].
125///
126/// [`DataType`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html
127/// [`StructArray`]: https://docs.rs/arrow/latest/arrow/array/struct.StructArray.html
128/// [`ListArray`]: https://docs.rs/arrow/latest/arrow/array/type.ListArray.html
129/// [`IntervalMonthDayNanoArray`]: https://docs.rs/arrow/latest/arrow/array/type.IntervalMonthDayNanoArray.html
130/// [support nanosecond intervals]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
131///
132/// ## Type Compatibility
133/// The writer can write Arrow [`RecordBatch`]s that are logically equivalent. This means that for
134/// a  given column, the writer can accept multiple Arrow [`DataType`]s that contain the same
135/// value type.
136///
137/// Currently, only compatibility between Arrow dictionary and native arrays are supported.
138/// Additional type compatibility may be added in future (see [issue #8012](https://github.com/apache/arrow-rs/issues/8012))
139/// ```
140/// # use std::sync::Arc;
141/// # use arrow_array::{DictionaryArray, RecordBatch, StringArray, UInt8Array};
142/// # use arrow_schema::{DataType, Field, Schema};
143/// # use parquet::arrow::arrow_writer::ArrowWriter;
144/// let record_batch1 = RecordBatch::try_new(
145///    Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])),
146///    vec![Arc::new(StringArray::from_iter_values(vec!["a", "b"]))]
147///  )
148/// .unwrap();
149///
150/// let mut buffer = Vec::new();
151/// let mut writer = ArrowWriter::try_new(&mut buffer, record_batch1.schema(), None).unwrap();
152/// writer.write(&record_batch1).unwrap();
153///
154/// let record_batch2 = RecordBatch::try_new(
155///     Arc::new(Schema::new(vec![Field::new(
156///         "col",
157///         DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
158///          false,
159///     )])),
160///     vec![Arc::new(DictionaryArray::new(
161///          UInt8Array::from_iter_values(vec![0, 1]),
162///          Arc::new(StringArray::from_iter_values(vec!["b", "c"])),
163///      ))],
164///  )
165///  .unwrap();
166///  writer.write(&record_batch2).unwrap();
167///  writer.close();
168/// ```
169pub struct ArrowWriter<W: Write> {
170    /// Underlying Parquet writer
171    writer: SerializedFileWriter<W>,
172
173    /// The in-progress row group if any
174    in_progress: Option<ArrowRowGroupWriter>,
175
176    /// A copy of the Arrow schema.
177    ///
178    /// The schema is used to verify that each record batch written has the correct schema
179    arrow_schema: SchemaRef,
180
181    /// Creates new [`ArrowRowGroupWriter`] instances as required
182    row_group_writer_factory: ArrowRowGroupWriterFactory,
183
184    /// The length of arrays to write to each row group
185    max_row_group_size: usize,
186}
187
188impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        let buffered_memory = self.in_progress_size();
191        f.debug_struct("ArrowWriter")
192            .field("writer", &self.writer)
193            .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
194            .field("in_progress_rows", &self.in_progress_rows())
195            .field("arrow_schema", &self.arrow_schema)
196            .field("max_row_group_size", &self.max_row_group_size)
197            .finish()
198    }
199}
200
201impl<W: Write + Send> ArrowWriter<W> {
202    /// Try to create a new Arrow writer
203    ///
204    /// The writer will fail if:
205    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
206    ///  * the Arrow schema contains unsupported datatypes such as Unions
207    pub fn try_new(
208        writer: W,
209        arrow_schema: SchemaRef,
210        props: Option<WriterProperties>,
211    ) -> Result<Self> {
212        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
213        Self::try_new_with_options(writer, arrow_schema, options)
214    }
215
216    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
217    ///
218    /// The writer will fail if:
219    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
220    ///  * the Arrow schema contains unsupported datatypes such as Unions
221    pub fn try_new_with_options(
222        writer: W,
223        arrow_schema: SchemaRef,
224        options: ArrowWriterOptions,
225    ) -> Result<Self> {
226        let mut props = options.properties;
227        let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
228        if let Some(schema_root) = &options.schema_root {
229            converter = converter.schema_root(schema_root);
230        }
231        let schema = converter.convert(&arrow_schema)?;
232        if !options.skip_arrow_metadata {
233            // add serialized arrow schema
234            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
235        }
236
237        let max_row_group_size = props.max_row_group_size();
238
239        let props_ptr = Arc::new(props);
240        let file_writer =
241            SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
242
243        let row_group_writer_factory =
244            ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
245
246        Ok(Self {
247            writer: file_writer,
248            in_progress: None,
249            arrow_schema,
250            row_group_writer_factory,
251            max_row_group_size,
252        })
253    }
254
255    /// Returns metadata for any flushed row groups
256    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
257        self.writer.flushed_row_groups()
258    }
259
260    /// Estimated memory usage, in bytes, of this `ArrowWriter`
261    ///
262    /// This estimate is formed bu summing the values of
263    /// [`ArrowColumnWriter::memory_size`] all in progress columns.
264    pub fn memory_size(&self) -> usize {
265        match &self.in_progress {
266            Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
267            None => 0,
268        }
269    }
270
271    /// Anticipated encoded size of the in progress row group.
272    ///
273    /// This estimate the row group size after being completely encoded is,
274    /// formed by summing the values of
275    /// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
276    /// columns.
277    pub fn in_progress_size(&self) -> usize {
278        match &self.in_progress {
279            Some(in_progress) => in_progress
280                .writers
281                .iter()
282                .map(|x| x.get_estimated_total_bytes())
283                .sum(),
284            None => 0,
285        }
286    }
287
288    /// Returns the number of rows buffered in the in progress row group
289    pub fn in_progress_rows(&self) -> usize {
290        self.in_progress
291            .as_ref()
292            .map(|x| x.buffered_rows)
293            .unwrap_or_default()
294    }
295
296    /// Returns the number of bytes written by this instance
297    pub fn bytes_written(&self) -> usize {
298        self.writer.bytes_written()
299    }
300
301    /// Encodes the provided [`RecordBatch`]
302    ///
303    /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
304    /// rows, the contents of `batch` will be written to one or more row groups such that all but
305    /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows.
306    ///
307    /// This will fail if the `batch`'s schema does not match the writer's schema.
308    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
309        if batch.num_rows() == 0 {
310            return Ok(());
311        }
312
313        let in_progress = match &mut self.in_progress {
314            Some(in_progress) => in_progress,
315            x => x.insert(
316                self.row_group_writer_factory
317                    .create_row_group_writer(self.writer.flushed_row_groups().len())?,
318            ),
319        };
320
321        // If would exceed max_row_group_size, split batch
322        if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
323            let to_write = self.max_row_group_size - in_progress.buffered_rows;
324            let a = batch.slice(0, to_write);
325            let b = batch.slice(to_write, batch.num_rows() - to_write);
326            self.write(&a)?;
327            return self.write(&b);
328        }
329
330        in_progress.write(batch)?;
331
332        if in_progress.buffered_rows >= self.max_row_group_size {
333            self.flush()?
334        }
335        Ok(())
336    }
337
338    /// Writes the given buf bytes to the internal buffer.
339    ///
340    /// It's safe to use this method to write data to the underlying writer,
341    /// because it will ensure that the buffering and byte‐counting layers are used.
342    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
343        self.writer.write_all(buf)
344    }
345
346    /// Flushes all buffered rows into a new row group
347    pub fn flush(&mut self) -> Result<()> {
348        let in_progress = match self.in_progress.take() {
349            Some(in_progress) => in_progress,
350            None => return Ok(()),
351        };
352
353        let mut row_group_writer = self.writer.next_row_group()?;
354        for chunk in in_progress.close()? {
355            chunk.append_to_row_group(&mut row_group_writer)?;
356        }
357        row_group_writer.close()?;
358        Ok(())
359    }
360
361    /// Additional [`KeyValue`] metadata to be written in addition to those from [`WriterProperties`]
362    ///
363    /// This method provide a way to append kv_metadata after write RecordBatch
364    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
365        self.writer.append_key_value_metadata(kv_metadata)
366    }
367
368    /// Returns a reference to the underlying writer.
369    pub fn inner(&self) -> &W {
370        self.writer.inner()
371    }
372
373    /// Returns a mutable reference to the underlying writer.
374    ///
375    /// **Warning**: if you write directly to this writer, you will skip
376    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
377    /// the file footer’s recorded offsets and sizes to diverge from reality,
378    /// resulting in an unreadable or corrupted Parquet file.
379    ///
380    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
381    pub fn inner_mut(&mut self) -> &mut W {
382        self.writer.inner_mut()
383    }
384
385    /// Flushes any outstanding data and returns the underlying writer.
386    pub fn into_inner(mut self) -> Result<W> {
387        self.flush()?;
388        self.writer.into_inner()
389    }
390
391    /// Close and finalize the underlying Parquet writer
392    ///
393    /// Unlike [`Self::close`] this does not consume self
394    ///
395    /// Attempting to write after calling finish will result in an error
396    pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
397        self.flush()?;
398        self.writer.finish()
399    }
400
401    /// Close and finalize the underlying Parquet writer
402    pub fn close(mut self) -> Result<crate::format::FileMetaData> {
403        self.finish()
404    }
405
406    /// Create a new row group writer and return its column writers.
407    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
408        self.flush()?;
409        let in_progress = self
410            .row_group_writer_factory
411            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
412        Ok(in_progress.writers)
413    }
414
415    /// Append the given column chunks to the file as a new row group.
416    pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
417        let mut row_group_writer = self.writer.next_row_group()?;
418        for chunk in chunks {
419            chunk.append_to_row_group(&mut row_group_writer)?;
420        }
421        row_group_writer.close()?;
422        Ok(())
423    }
424}
425
426impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
427    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
428        self.write(batch).map_err(|e| e.into())
429    }
430
431    fn close(self) -> std::result::Result<(), ArrowError> {
432        self.close()?;
433        Ok(())
434    }
435}
436
437/// Arrow-specific configuration settings for writing parquet files.
438///
439/// See [`ArrowWriter`] for how to configure the writer.
440#[derive(Debug, Clone, Default)]
441pub struct ArrowWriterOptions {
442    properties: WriterProperties,
443    skip_arrow_metadata: bool,
444    schema_root: Option<String>,
445}
446
447impl ArrowWriterOptions {
448    /// Creates a new [`ArrowWriterOptions`] with the default settings.
449    pub fn new() -> Self {
450        Self::default()
451    }
452
453    /// Sets the [`WriterProperties`] for writing parquet files.
454    pub fn with_properties(self, properties: WriterProperties) -> Self {
455        Self { properties, ..self }
456    }
457
458    /// Skip encoding the embedded arrow metadata (defaults to `false`)
459    ///
460    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow schema
461    /// by default.
462    ///
463    /// Set `skip_arrow_metadata` to true, to skip encoding the embedded metadata.
464    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
465        Self {
466            skip_arrow_metadata,
467            ..self
468        }
469    }
470
471    /// Set the name of the root parquet schema element (defaults to `"arrow_schema"`)
472    pub fn with_schema_root(self, schema_root: String) -> Self {
473        Self {
474            schema_root: Some(schema_root),
475            ..self
476        }
477    }
478}
479
480/// A single column chunk produced by [`ArrowColumnWriter`]
481#[derive(Default)]
482struct ArrowColumnChunkData {
483    length: usize,
484    data: Vec<Bytes>,
485}
486
487impl Length for ArrowColumnChunkData {
488    fn len(&self) -> u64 {
489        self.length as _
490    }
491}
492
493impl ChunkReader for ArrowColumnChunkData {
494    type T = ArrowColumnChunkReader;
495
496    fn get_read(&self, start: u64) -> Result<Self::T> {
497        assert_eq!(start, 0); // Assume append_column writes all data in one-shot
498        Ok(ArrowColumnChunkReader(
499            self.data.clone().into_iter().peekable(),
500        ))
501    }
502
503    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
504        unimplemented!()
505    }
506}
507
508/// A [`Read`] for [`ArrowColumnChunkData`]
509struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
510
511impl Read for ArrowColumnChunkReader {
512    fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
513        let buffer = loop {
514            match self.0.peek_mut() {
515                Some(b) if b.is_empty() => {
516                    self.0.next();
517                    continue;
518                }
519                Some(b) => break b,
520                None => return Ok(0),
521            }
522        };
523
524        let len = buffer.len().min(out.len());
525        let b = buffer.split_to(len);
526        out[..len].copy_from_slice(&b);
527        Ok(len)
528    }
529}
530
531/// A shared [`ArrowColumnChunkData`]
532///
533/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
534/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
535type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
536
537#[derive(Default)]
538struct ArrowPageWriter {
539    buffer: SharedColumnChunk,
540    #[cfg(feature = "encryption")]
541    page_encryptor: Option<PageEncryptor>,
542}
543
544impl ArrowPageWriter {
545    #[cfg(feature = "encryption")]
546    pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
547        self.page_encryptor = page_encryptor;
548        self
549    }
550
551    #[cfg(feature = "encryption")]
552    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
553        self.page_encryptor.as_mut()
554    }
555
556    #[cfg(not(feature = "encryption"))]
557    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
558        None
559    }
560}
561
562impl PageWriter for ArrowPageWriter {
563    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
564        let page = match self.page_encryptor_mut() {
565            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
566            None => page,
567        };
568
569        let page_header = page.to_thrift_header();
570        let header = {
571            let mut header = Vec::with_capacity(1024);
572
573            match self.page_encryptor_mut() {
574                Some(page_encryptor) => {
575                    page_encryptor.encrypt_page_header(&page_header, &mut header)?;
576                    if page.compressed_page().is_data_page() {
577                        page_encryptor.increment_page();
578                    }
579                }
580                None => {
581                    let mut protocol = TCompactOutputProtocol::new(&mut header);
582                    page_header.write_to_out_protocol(&mut protocol)?;
583                }
584            };
585
586            Bytes::from(header)
587        };
588
589        let mut buf = self.buffer.try_lock().unwrap();
590
591        let data = page.compressed_page().buffer().clone();
592        let compressed_size = data.len() + header.len();
593
594        let mut spec = PageWriteSpec::new();
595        spec.page_type = page.page_type();
596        spec.num_values = page.num_values();
597        spec.uncompressed_size = page.uncompressed_size() + header.len();
598        spec.offset = buf.length as u64;
599        spec.compressed_size = compressed_size;
600        spec.bytes_written = compressed_size as u64;
601
602        buf.length += compressed_size;
603        buf.data.push(header);
604        buf.data.push(data);
605
606        Ok(spec)
607    }
608
609    fn close(&mut self) -> Result<()> {
610        Ok(())
611    }
612}
613
614/// A leaf column that can be encoded by [`ArrowColumnWriter`]
615#[derive(Debug)]
616pub struct ArrowLeafColumn(ArrayLevels);
617
618/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
619pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
620    let levels = calculate_array_levels(array, field)?;
621    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
622}
623
624/// The data for a single column chunk, see [`ArrowColumnWriter`]
625pub struct ArrowColumnChunk {
626    data: ArrowColumnChunkData,
627    close: ColumnCloseResult,
628}
629
630impl std::fmt::Debug for ArrowColumnChunk {
631    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632        f.debug_struct("ArrowColumnChunk")
633            .field("length", &self.data.length)
634            .finish_non_exhaustive()
635    }
636}
637
638impl ArrowColumnChunk {
639    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data
640    pub fn append_to_row_group<W: Write + Send>(
641        self,
642        writer: &mut SerializedRowGroupWriter<'_, W>,
643    ) -> Result<()> {
644        writer.append_column(&self.data, self.close)
645    }
646}
647
648/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
649///
650/// Note: This is a low-level interface for applications that require
651/// fine-grained control of encoding (e.g. encoding using multiple threads),
652/// see [`ArrowWriter`] for a higher-level interface
653///
654/// # Example: Encoding two Arrow Array's in Parallel
655/// ```
656/// // The arrow schema
657/// # use std::sync::Arc;
658/// # use arrow_array::*;
659/// # use arrow_schema::*;
660/// # use parquet::arrow::ArrowSchemaConverter;
661/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
662/// # use parquet::file::properties::WriterProperties;
663/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
664/// #
665/// let schema = Arc::new(Schema::new(vec![
666///     Field::new("i32", DataType::Int32, false),
667///     Field::new("f32", DataType::Float32, false),
668/// ]));
669///
670/// // Compute the parquet schema
671/// let props = Arc::new(WriterProperties::default());
672/// let parquet_schema = ArrowSchemaConverter::new()
673///   .with_coerce_types(props.coerce_types())
674///   .convert(&schema)
675///   .unwrap();
676///
677/// // Create writers for each of the leaf columns
678/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
679///
680/// // Spawn a worker thread for each column
681/// //
682/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better.
683/// // The `map` produces an iterator of type `tuple of (thread handle, send channel)`.
684/// let mut workers: Vec<_> = col_writers
685///     .into_iter()
686///     .map(|mut col_writer| {
687///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
688///         let handle = std::thread::spawn(move || {
689///             // receive Arrays to encode via the channel
690///             for col in recv {
691///                 col_writer.write(&col)?;
692///             }
693///             // once the input is complete, close the writer
694///             // to return the newly created ArrowColumnChunk
695///             col_writer.close()
696///         });
697///         (handle, send)
698///     })
699///     .collect();
700///
701/// // Create parquet writer
702/// let root_schema = parquet_schema.root_schema_ptr();
703/// // write to memory in the example, but this could be a File
704/// let mut out = Vec::with_capacity(1024);
705/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
706///   .unwrap();
707///
708/// // Start row group
709/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
710///   .next_row_group()
711///   .unwrap();
712///
713/// // Create some example input columns to encode
714/// let to_write = vec![
715///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
716///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
717/// ];
718///
719/// // Send the input columns to the workers
720/// let mut worker_iter = workers.iter_mut();
721/// for (arr, field) in to_write.iter().zip(&schema.fields) {
722///     for leaves in compute_leaves(field, arr).unwrap() {
723///         worker_iter.next().unwrap().1.send(leaves).unwrap();
724///     }
725/// }
726///
727/// // Wait for the workers to complete encoding, and append
728/// // the resulting column chunks to the row group (and the file)
729/// for (handle, send) in workers {
730///     drop(send); // Drop send side to signal termination
731///     // wait for the worker to send the completed chunk
732///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
733///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
734/// }
735/// // Close the row group which writes to the underlying file
736/// row_group_writer.close().unwrap();
737///
738/// let metadata = writer.close().unwrap();
739/// assert_eq!(metadata.num_rows, 3);
740/// ```
741pub struct ArrowColumnWriter {
742    writer: ArrowColumnWriterImpl,
743    chunk: SharedColumnChunk,
744}
745
746impl std::fmt::Debug for ArrowColumnWriter {
747    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
748        f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
749    }
750}
751
752enum ArrowColumnWriterImpl {
753    ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
754    Column(ColumnWriter<'static>),
755}
756
757impl ArrowColumnWriter {
758    /// Write an [`ArrowLeafColumn`]
759    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
760        match &mut self.writer {
761            ArrowColumnWriterImpl::Column(c) => {
762                write_leaf(c, &col.0)?;
763            }
764            ArrowColumnWriterImpl::ByteArray(c) => {
765                write_primitive(c, col.0.array().as_ref(), &col.0)?;
766            }
767        }
768        Ok(())
769    }
770
771    /// Close this column returning the written [`ArrowColumnChunk`]
772    pub fn close(self) -> Result<ArrowColumnChunk> {
773        let close = match self.writer {
774            ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
775            ArrowColumnWriterImpl::Column(c) => c.close()?,
776        };
777        let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
778        let data = chunk.into_inner().unwrap();
779        Ok(ArrowColumnChunk { data, close })
780    }
781
782    /// Returns the estimated total memory usage by the writer.
783    ///
784    /// This  [`Self::get_estimated_total_bytes`] this is an estimate
785    /// of the current memory usage and not it's anticipated encoded size.
786    ///
787    /// This includes:
788    /// 1. Data buffered in encoded form
789    /// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
790    ///
791    /// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
792    pub fn memory_size(&self) -> usize {
793        match &self.writer {
794            ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
795            ArrowColumnWriterImpl::Column(c) => c.memory_size(),
796        }
797    }
798
799    /// Returns the estimated total encoded bytes for this column writer.
800    ///
801    /// This includes:
802    /// 1. Data buffered in encoded form
803    /// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
804    ///
805    /// This value should be less than or equal to [`Self::memory_size`]
806    pub fn get_estimated_total_bytes(&self) -> usize {
807        match &self.writer {
808            ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
809            ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
810        }
811    }
812}
813
814/// Encodes [`RecordBatch`] to a parquet row group
815struct ArrowRowGroupWriter {
816    writers: Vec<ArrowColumnWriter>,
817    schema: SchemaRef,
818    buffered_rows: usize,
819}
820
821impl ArrowRowGroupWriter {
822    fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
823        Self {
824            writers,
825            schema: arrow.clone(),
826            buffered_rows: 0,
827        }
828    }
829
830    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
831        self.buffered_rows += batch.num_rows();
832        let mut writers = self.writers.iter_mut();
833        for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
834            for leaf in compute_leaves(field.as_ref(), column)? {
835                writers.next().unwrap().write(&leaf)?
836            }
837        }
838        Ok(())
839    }
840
841    fn close(self) -> Result<Vec<ArrowColumnChunk>> {
842        self.writers
843            .into_iter()
844            .map(|writer| writer.close())
845            .collect()
846    }
847}
848
849struct ArrowRowGroupWriterFactory {
850    schema: SchemaDescriptor,
851    arrow_schema: SchemaRef,
852    props: WriterPropertiesPtr,
853    #[cfg(feature = "encryption")]
854    file_encryptor: Option<Arc<FileEncryptor>>,
855}
856
857impl ArrowRowGroupWriterFactory {
858    #[cfg(feature = "encryption")]
859    fn new<W: Write + Send>(
860        file_writer: &SerializedFileWriter<W>,
861        schema: SchemaDescriptor,
862        arrow_schema: SchemaRef,
863        props: WriterPropertiesPtr,
864    ) -> Self {
865        Self {
866            schema,
867            arrow_schema,
868            props,
869            file_encryptor: file_writer.file_encryptor(),
870        }
871    }
872
873    #[cfg(not(feature = "encryption"))]
874    fn new<W: Write + Send>(
875        _file_writer: &SerializedFileWriter<W>,
876        schema: SchemaDescriptor,
877        arrow_schema: SchemaRef,
878        props: WriterPropertiesPtr,
879    ) -> Self {
880        Self {
881            schema,
882            arrow_schema,
883            props,
884        }
885    }
886
887    #[cfg(feature = "encryption")]
888    fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
889        let writers = get_column_writers_with_encryptor(
890            &self.schema,
891            &self.props,
892            &self.arrow_schema,
893            self.file_encryptor.clone(),
894            row_group_index,
895        )?;
896        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
897    }
898
899    #[cfg(not(feature = "encryption"))]
900    fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
901        let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
902        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
903    }
904}
905
906/// Returns the [`ArrowColumnWriter`] for a given schema
907pub fn get_column_writers(
908    parquet: &SchemaDescriptor,
909    props: &WriterPropertiesPtr,
910    arrow: &SchemaRef,
911) -> Result<Vec<ArrowColumnWriter>> {
912    let mut writers = Vec::with_capacity(arrow.fields.len());
913    let mut leaves = parquet.columns().iter();
914    let column_factory = ArrowColumnWriterFactory::new();
915    for field in &arrow.fields {
916        column_factory.get_arrow_column_writer(
917            field.data_type(),
918            props,
919            &mut leaves,
920            &mut writers,
921        )?;
922    }
923    Ok(writers)
924}
925
926/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
927#[cfg(feature = "encryption")]
928fn get_column_writers_with_encryptor(
929    parquet: &SchemaDescriptor,
930    props: &WriterPropertiesPtr,
931    arrow: &SchemaRef,
932    file_encryptor: Option<Arc<FileEncryptor>>,
933    row_group_index: usize,
934) -> Result<Vec<ArrowColumnWriter>> {
935    let mut writers = Vec::with_capacity(arrow.fields.len());
936    let mut leaves = parquet.columns().iter();
937    let column_factory =
938        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
939    for field in &arrow.fields {
940        column_factory.get_arrow_column_writer(
941            field.data_type(),
942            props,
943            &mut leaves,
944            &mut writers,
945        )?;
946    }
947    Ok(writers)
948}
949
950/// Gets [`ArrowColumnWriter`] instances for different data types
951struct ArrowColumnWriterFactory {
952    #[cfg(feature = "encryption")]
953    row_group_index: usize,
954    #[cfg(feature = "encryption")]
955    file_encryptor: Option<Arc<FileEncryptor>>,
956}
957
958impl ArrowColumnWriterFactory {
959    pub fn new() -> Self {
960        Self {
961            #[cfg(feature = "encryption")]
962            row_group_index: 0,
963            #[cfg(feature = "encryption")]
964            file_encryptor: None,
965        }
966    }
967
968    #[cfg(feature = "encryption")]
969    pub fn with_file_encryptor(
970        mut self,
971        row_group_index: usize,
972        file_encryptor: Option<Arc<FileEncryptor>>,
973    ) -> Self {
974        self.row_group_index = row_group_index;
975        self.file_encryptor = file_encryptor;
976        self
977    }
978
979    #[cfg(feature = "encryption")]
980    fn create_page_writer(
981        &self,
982        column_descriptor: &ColumnDescPtr,
983        column_index: usize,
984    ) -> Result<Box<ArrowPageWriter>> {
985        let column_path = column_descriptor.path().string();
986        let page_encryptor = PageEncryptor::create_if_column_encrypted(
987            &self.file_encryptor,
988            self.row_group_index,
989            column_index,
990            &column_path,
991        )?;
992        Ok(Box::new(
993            ArrowPageWriter::default().with_encryptor(page_encryptor),
994        ))
995    }
996
997    #[cfg(not(feature = "encryption"))]
998    fn create_page_writer(
999        &self,
1000        _column_descriptor: &ColumnDescPtr,
1001        _column_index: usize,
1002    ) -> Result<Box<ArrowPageWriter>> {
1003        Ok(Box::<ArrowPageWriter>::default())
1004    }
1005
1006    /// Gets the [`ArrowColumnWriter`] for the given `data_type`
1007    fn get_arrow_column_writer(
1008        &self,
1009        data_type: &ArrowDataType,
1010        props: &WriterPropertiesPtr,
1011        leaves: &mut Iter<'_, ColumnDescPtr>,
1012        out: &mut Vec<ArrowColumnWriter>,
1013    ) -> Result<()> {
1014        let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1015            let page_writer = self.create_page_writer(desc, out.len())?;
1016            let chunk = page_writer.buffer.clone();
1017            let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1018            Ok(ArrowColumnWriter {
1019                chunk,
1020                writer: ArrowColumnWriterImpl::Column(writer),
1021            })
1022        };
1023
1024        let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1025            let page_writer = self.create_page_writer(desc, out.len())?;
1026            let chunk = page_writer.buffer.clone();
1027            let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1028            Ok(ArrowColumnWriter {
1029                chunk,
1030                writer: ArrowColumnWriterImpl::ByteArray(writer),
1031            })
1032        };
1033
1034        match data_type {
1035            _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1036            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
1037            ArrowDataType::LargeBinary
1038            | ArrowDataType::Binary
1039            | ArrowDataType::Utf8
1040            | ArrowDataType::LargeUtf8
1041            | ArrowDataType::BinaryView
1042            | ArrowDataType::Utf8View => {
1043                out.push(bytes(leaves.next().unwrap())?)
1044            }
1045            ArrowDataType::List(f)
1046            | ArrowDataType::LargeList(f)
1047            | ArrowDataType::FixedSizeList(f, _) => {
1048                self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1049            }
1050            ArrowDataType::Struct(fields) => {
1051                for field in fields {
1052                    self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1053                }
1054            }
1055            ArrowDataType::Map(f, _) => match f.data_type() {
1056                ArrowDataType::Struct(f) => {
1057                    self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1058                    self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1059                }
1060                _ => unreachable!("invalid map type"),
1061            }
1062            ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1063                ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
1064                    out.push(bytes(leaves.next().unwrap())?)
1065                }
1066                ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1067                    out.push(bytes(leaves.next().unwrap())?)
1068                }
1069                ArrowDataType::FixedSizeBinary(_) => {
1070                    out.push(bytes(leaves.next().unwrap())?)
1071                }
1072                _ => {
1073                    out.push(col(leaves.next().unwrap())?)
1074                }
1075            }
1076            _ => return Err(ParquetError::NYI(
1077                format!(
1078                    "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1079                )
1080            ))
1081        }
1082        Ok(())
1083    }
1084}
1085
1086fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1087    let column = levels.array().as_ref();
1088    let indices = levels.non_null_indices();
1089    match writer {
1090        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1091            match column.data_type() {
1092                ArrowDataType::Date64 => {
1093                    // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
1094                    let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1095                    let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1096
1097                    let array = array.as_primitive::<Int32Type>();
1098                    write_primitive(typed, array.values(), levels)
1099                }
1100                ArrowDataType::UInt32 => {
1101                    let values = column.as_primitive::<UInt32Type>().values();
1102                    // follow C++ implementation and use overflow/reinterpret cast from  u32 to i32 which will map
1103                    // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
1104                    let array = values.inner().typed_data::<i32>();
1105                    write_primitive(typed, array, levels)
1106                }
1107                ArrowDataType::Decimal32(_, _) => {
1108                    let array = column
1109                        .as_primitive::<Decimal32Type>()
1110                        .unary::<_, Int32Type>(|v| v);
1111                    write_primitive(typed, array.values(), levels)
1112                }
1113                ArrowDataType::Decimal64(_, _) => {
1114                    // use the int32 to represent the decimal with low precision
1115                    let array = column
1116                        .as_primitive::<Decimal64Type>()
1117                        .unary::<_, Int32Type>(|v| v as i32);
1118                    write_primitive(typed, array.values(), levels)
1119                }
1120                ArrowDataType::Decimal128(_, _) => {
1121                    // use the int32 to represent the decimal with low precision
1122                    let array = column
1123                        .as_primitive::<Decimal128Type>()
1124                        .unary::<_, Int32Type>(|v| v as i32);
1125                    write_primitive(typed, array.values(), levels)
1126                }
1127                ArrowDataType::Decimal256(_, _) => {
1128                    // use the int32 to represent the decimal with low precision
1129                    let array = column
1130                        .as_primitive::<Decimal256Type>()
1131                        .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1132                    write_primitive(typed, array.values(), levels)
1133                }
1134                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1135                    ArrowDataType::Decimal32(_, _) => {
1136                        let array = arrow_cast::cast(column, value_type)?;
1137                        let array = array
1138                            .as_primitive::<Decimal32Type>()
1139                            .unary::<_, Int32Type>(|v| v);
1140                        write_primitive(typed, array.values(), levels)
1141                    }
1142                    ArrowDataType::Decimal64(_, _) => {
1143                        let array = arrow_cast::cast(column, value_type)?;
1144                        let array = array
1145                            .as_primitive::<Decimal64Type>()
1146                            .unary::<_, Int32Type>(|v| v as i32);
1147                        write_primitive(typed, array.values(), levels)
1148                    }
1149                    ArrowDataType::Decimal128(_, _) => {
1150                        let array = arrow_cast::cast(column, value_type)?;
1151                        let array = array
1152                            .as_primitive::<Decimal128Type>()
1153                            .unary::<_, Int32Type>(|v| v as i32);
1154                        write_primitive(typed, array.values(), levels)
1155                    }
1156                    ArrowDataType::Decimal256(_, _) => {
1157                        let array = arrow_cast::cast(column, value_type)?;
1158                        let array = array
1159                            .as_primitive::<Decimal256Type>()
1160                            .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1161                        write_primitive(typed, array.values(), levels)
1162                    }
1163                    _ => {
1164                        let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1165                        let array = array.as_primitive::<Int32Type>();
1166                        write_primitive(typed, array.values(), levels)
1167                    }
1168                },
1169                _ => {
1170                    let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1171                    let array = array.as_primitive::<Int32Type>();
1172                    write_primitive(typed, array.values(), levels)
1173                }
1174            }
1175        }
1176        ColumnWriter::BoolColumnWriter(ref mut typed) => {
1177            let array = column.as_boolean();
1178            typed.write_batch(
1179                get_bool_array_slice(array, indices).as_slice(),
1180                levels.def_levels(),
1181                levels.rep_levels(),
1182            )
1183        }
1184        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1185            match column.data_type() {
1186                ArrowDataType::Date64 => {
1187                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1188
1189                    let array = array.as_primitive::<Int64Type>();
1190                    write_primitive(typed, array.values(), levels)
1191                }
1192                ArrowDataType::Int64 => {
1193                    let array = column.as_primitive::<Int64Type>();
1194                    write_primitive(typed, array.values(), levels)
1195                }
1196                ArrowDataType::UInt64 => {
1197                    let values = column.as_primitive::<UInt64Type>().values();
1198                    // follow C++ implementation and use overflow/reinterpret cast from  u64 to i64 which will map
1199                    // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
1200                    let array = values.inner().typed_data::<i64>();
1201                    write_primitive(typed, array, levels)
1202                }
1203                ArrowDataType::Decimal64(_, _) => {
1204                    let array = column
1205                        .as_primitive::<Decimal64Type>()
1206                        .unary::<_, Int64Type>(|v| v);
1207                    write_primitive(typed, array.values(), levels)
1208                }
1209                ArrowDataType::Decimal128(_, _) => {
1210                    // use the int64 to represent the decimal with low precision
1211                    let array = column
1212                        .as_primitive::<Decimal128Type>()
1213                        .unary::<_, Int64Type>(|v| v as i64);
1214                    write_primitive(typed, array.values(), levels)
1215                }
1216                ArrowDataType::Decimal256(_, _) => {
1217                    // use the int64 to represent the decimal with low precision
1218                    let array = column
1219                        .as_primitive::<Decimal256Type>()
1220                        .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1221                    write_primitive(typed, array.values(), levels)
1222                }
1223                ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1224                    ArrowDataType::Decimal64(_, _) => {
1225                        let array = arrow_cast::cast(column, value_type)?;
1226                        let array = array
1227                            .as_primitive::<Decimal64Type>()
1228                            .unary::<_, Int64Type>(|v| v);
1229                        write_primitive(typed, array.values(), levels)
1230                    }
1231                    ArrowDataType::Decimal128(_, _) => {
1232                        let array = arrow_cast::cast(column, value_type)?;
1233                        let array = array
1234                            .as_primitive::<Decimal128Type>()
1235                            .unary::<_, Int64Type>(|v| v as i64);
1236                        write_primitive(typed, array.values(), levels)
1237                    }
1238                    ArrowDataType::Decimal256(_, _) => {
1239                        let array = arrow_cast::cast(column, value_type)?;
1240                        let array = array
1241                            .as_primitive::<Decimal256Type>()
1242                            .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1243                        write_primitive(typed, array.values(), levels)
1244                    }
1245                    _ => {
1246                        let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1247                        let array = array.as_primitive::<Int64Type>();
1248                        write_primitive(typed, array.values(), levels)
1249                    }
1250                },
1251                _ => {
1252                    let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1253                    let array = array.as_primitive::<Int64Type>();
1254                    write_primitive(typed, array.values(), levels)
1255                }
1256            }
1257        }
1258        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1259            unreachable!("Currently unreachable because data type not supported")
1260        }
1261        ColumnWriter::FloatColumnWriter(ref mut typed) => {
1262            let array = column.as_primitive::<Float32Type>();
1263            write_primitive(typed, array.values(), levels)
1264        }
1265        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1266            let array = column.as_primitive::<Float64Type>();
1267            write_primitive(typed, array.values(), levels)
1268        }
1269        ColumnWriter::ByteArrayColumnWriter(_) => {
1270            unreachable!("should use ByteArrayWriter")
1271        }
1272        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1273            let bytes = match column.data_type() {
1274                ArrowDataType::Interval(interval_unit) => match interval_unit {
1275                    IntervalUnit::YearMonth => {
1276                        let array = column
1277                            .as_any()
1278                            .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1279                            .unwrap();
1280                        get_interval_ym_array_slice(array, indices)
1281                    }
1282                    IntervalUnit::DayTime => {
1283                        let array = column
1284                            .as_any()
1285                            .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1286                            .unwrap();
1287                        get_interval_dt_array_slice(array, indices)
1288                    }
1289                    _ => {
1290                        return Err(ParquetError::NYI(
1291                            format!(
1292                                "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1293                            )
1294                        ));
1295                    }
1296                },
1297                ArrowDataType::FixedSizeBinary(_) => {
1298                    let array = column
1299                        .as_any()
1300                        .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1301                        .unwrap();
1302                    get_fsb_array_slice(array, indices)
1303                }
1304                ArrowDataType::Decimal32(_, _) => {
1305                    let array = column.as_primitive::<Decimal32Type>();
1306                    get_decimal_32_array_slice(array, indices)
1307                }
1308                ArrowDataType::Decimal64(_, _) => {
1309                    let array = column.as_primitive::<Decimal64Type>();
1310                    get_decimal_64_array_slice(array, indices)
1311                }
1312                ArrowDataType::Decimal128(_, _) => {
1313                    let array = column.as_primitive::<Decimal128Type>();
1314                    get_decimal_128_array_slice(array, indices)
1315                }
1316                ArrowDataType::Decimal256(_, _) => {
1317                    let array = column
1318                        .as_any()
1319                        .downcast_ref::<arrow_array::Decimal256Array>()
1320                        .unwrap();
1321                    get_decimal_256_array_slice(array, indices)
1322                }
1323                ArrowDataType::Float16 => {
1324                    let array = column.as_primitive::<Float16Type>();
1325                    get_float_16_array_slice(array, indices)
1326                }
1327                _ => {
1328                    return Err(ParquetError::NYI(
1329                        "Attempting to write an Arrow type that is not yet implemented".to_string(),
1330                    ));
1331                }
1332            };
1333            typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1334        }
1335    }
1336}
1337
1338fn write_primitive<E: ColumnValueEncoder>(
1339    writer: &mut GenericColumnWriter<E>,
1340    values: &E::Values,
1341    levels: &ArrayLevels,
1342) -> Result<usize> {
1343    writer.write_batch_internal(
1344        values,
1345        Some(levels.non_null_indices()),
1346        levels.def_levels(),
1347        levels.rep_levels(),
1348        None,
1349        None,
1350        None,
1351    )
1352}
1353
1354fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1355    let mut values = Vec::with_capacity(indices.len());
1356    for i in indices {
1357        values.push(array.value(*i))
1358    }
1359    values
1360}
1361
1362/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1363/// An Arrow YearMonth interval only stores months, thus only the first 4 bytes are populated.
1364fn get_interval_ym_array_slice(
1365    array: &arrow_array::IntervalYearMonthArray,
1366    indices: &[usize],
1367) -> Vec<FixedLenByteArray> {
1368    let mut values = Vec::with_capacity(indices.len());
1369    for i in indices {
1370        let mut value = array.value(*i).to_le_bytes().to_vec();
1371        let mut suffix = vec![0; 8];
1372        value.append(&mut suffix);
1373        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1374    }
1375    values
1376}
1377
1378/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
1379/// An Arrow DayTime interval only stores days and millis, thus the first 4 bytes are not populated.
1380fn get_interval_dt_array_slice(
1381    array: &arrow_array::IntervalDayTimeArray,
1382    indices: &[usize],
1383) -> Vec<FixedLenByteArray> {
1384    let mut values = Vec::with_capacity(indices.len());
1385    for i in indices {
1386        let mut out = [0; 12];
1387        let value = array.value(*i);
1388        out[4..8].copy_from_slice(&value.days.to_le_bytes());
1389        out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1390        values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1391    }
1392    values
1393}
1394
1395fn get_decimal_32_array_slice(
1396    array: &arrow_array::Decimal32Array,
1397    indices: &[usize],
1398) -> Vec<FixedLenByteArray> {
1399    let mut values = Vec::with_capacity(indices.len());
1400    let size = decimal_length_from_precision(array.precision());
1401    for i in indices {
1402        let as_be_bytes = array.value(*i).to_be_bytes();
1403        let resized_value = as_be_bytes[(4 - size)..].to_vec();
1404        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1405    }
1406    values
1407}
1408
1409fn get_decimal_64_array_slice(
1410    array: &arrow_array::Decimal64Array,
1411    indices: &[usize],
1412) -> Vec<FixedLenByteArray> {
1413    let mut values = Vec::with_capacity(indices.len());
1414    let size = decimal_length_from_precision(array.precision());
1415    for i in indices {
1416        let as_be_bytes = array.value(*i).to_be_bytes();
1417        let resized_value = as_be_bytes[(8 - size)..].to_vec();
1418        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1419    }
1420    values
1421}
1422
1423fn get_decimal_128_array_slice(
1424    array: &arrow_array::Decimal128Array,
1425    indices: &[usize],
1426) -> Vec<FixedLenByteArray> {
1427    let mut values = Vec::with_capacity(indices.len());
1428    let size = decimal_length_from_precision(array.precision());
1429    for i in indices {
1430        let as_be_bytes = array.value(*i).to_be_bytes();
1431        let resized_value = as_be_bytes[(16 - size)..].to_vec();
1432        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1433    }
1434    values
1435}
1436
1437fn get_decimal_256_array_slice(
1438    array: &arrow_array::Decimal256Array,
1439    indices: &[usize],
1440) -> Vec<FixedLenByteArray> {
1441    let mut values = Vec::with_capacity(indices.len());
1442    let size = decimal_length_from_precision(array.precision());
1443    for i in indices {
1444        let as_be_bytes = array.value(*i).to_be_bytes();
1445        let resized_value = as_be_bytes[(32 - size)..].to_vec();
1446        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1447    }
1448    values
1449}
1450
1451fn get_float_16_array_slice(
1452    array: &arrow_array::Float16Array,
1453    indices: &[usize],
1454) -> Vec<FixedLenByteArray> {
1455    let mut values = Vec::with_capacity(indices.len());
1456    for i in indices {
1457        let value = array.value(*i).to_le_bytes().to_vec();
1458        values.push(FixedLenByteArray::from(ByteArray::from(value)));
1459    }
1460    values
1461}
1462
1463fn get_fsb_array_slice(
1464    array: &arrow_array::FixedSizeBinaryArray,
1465    indices: &[usize],
1466) -> Vec<FixedLenByteArray> {
1467    let mut values = Vec::with_capacity(indices.len());
1468    for i in indices {
1469        let value = array.value(*i).to_vec();
1470        values.push(FixedLenByteArray::from(ByteArray::from(value)))
1471    }
1472    values
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477    use super::*;
1478
1479    use std::fs::File;
1480    use std::io::Seek;
1481
1482    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1483    use crate::arrow::ARROW_SCHEMA_META_KEY;
1484    use crate::column::page::{Page, PageReader};
1485    use crate::file::page_encoding_stats::PageEncodingStats;
1486    use crate::file::reader::SerializedPageReader;
1487    use crate::format::PageHeader;
1488    use crate::schema::types::ColumnPath;
1489    use crate::thrift::TCompactSliceInputProtocol;
1490    use arrow::datatypes::ToByteSlice;
1491    use arrow::datatypes::{DataType, Schema};
1492    use arrow::error::Result as ArrowResult;
1493    use arrow::util::data_gen::create_random_array;
1494    use arrow::util::pretty::pretty_format_batches;
1495    use arrow::{array::*, buffer::Buffer};
1496    use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1497    use arrow_schema::Fields;
1498    use half::f16;
1499    use num::{FromPrimitive, ToPrimitive};
1500    use tempfile::tempfile;
1501
1502    use crate::basic::Encoding;
1503    use crate::data_type::AsBytes;
1504    use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1505    use crate::file::page_index::index::Index;
1506    use crate::file::properties::{
1507        BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1508    };
1509    use crate::file::serialized_reader::ReadOptionsBuilder;
1510    use crate::file::{
1511        reader::{FileReader, SerializedFileReader},
1512        statistics::Statistics,
1513    };
1514
1515    #[test]
1516    fn arrow_writer() {
1517        // define schema
1518        let schema = Schema::new(vec![
1519            Field::new("a", DataType::Int32, false),
1520            Field::new("b", DataType::Int32, true),
1521        ]);
1522
1523        // create some data
1524        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1525        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1526
1527        // build a record batch
1528        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1529
1530        roundtrip(batch, Some(SMALL_SIZE / 2));
1531    }
1532
1533    fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1534        let mut buffer = vec![];
1535
1536        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1537        writer.write(expected_batch).unwrap();
1538        writer.close().unwrap();
1539
1540        buffer
1541    }
1542
1543    fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1544        let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1545        writer.write(expected_batch).unwrap();
1546        writer.into_inner().unwrap()
1547    }
1548
1549    #[test]
1550    fn roundtrip_bytes() {
1551        // define schema
1552        let schema = Arc::new(Schema::new(vec![
1553            Field::new("a", DataType::Int32, false),
1554            Field::new("b", DataType::Int32, true),
1555        ]));
1556
1557        // create some data
1558        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1559        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1560
1561        // build a record batch
1562        let expected_batch =
1563            RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1564
1565        for buffer in [
1566            get_bytes_after_close(schema.clone(), &expected_batch),
1567            get_bytes_by_into_inner(schema, &expected_batch),
1568        ] {
1569            let cursor = Bytes::from(buffer);
1570            let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1571
1572            let actual_batch = record_batch_reader
1573                .next()
1574                .expect("No batch found")
1575                .expect("Unable to get batch");
1576
1577            assert_eq!(expected_batch.schema(), actual_batch.schema());
1578            assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1579            assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1580            for i in 0..expected_batch.num_columns() {
1581                let expected_data = expected_batch.column(i).to_data();
1582                let actual_data = actual_batch.column(i).to_data();
1583
1584                assert_eq!(expected_data, actual_data);
1585            }
1586        }
1587    }
1588
1589    #[test]
1590    fn arrow_writer_non_null() {
1591        // define schema
1592        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1593
1594        // create some data
1595        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1596
1597        // build a record batch
1598        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1599
1600        roundtrip(batch, Some(SMALL_SIZE / 2));
1601    }
1602
1603    #[test]
1604    fn arrow_writer_list() {
1605        // define schema
1606        let schema = Schema::new(vec![Field::new(
1607            "a",
1608            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1609            true,
1610        )]);
1611
1612        // create some data
1613        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1614
1615        // Construct a buffer for value offsets, for the nested array:
1616        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
1617        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1618
1619        // Construct a list array from the above two
1620        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1621            DataType::Int32,
1622            false,
1623        ))))
1624        .len(5)
1625        .add_buffer(a_value_offsets)
1626        .add_child_data(a_values.into_data())
1627        .null_bit_buffer(Some(Buffer::from([0b00011011])))
1628        .build()
1629        .unwrap();
1630        let a = ListArray::from(a_list_data);
1631
1632        // build a record batch
1633        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1634
1635        assert_eq!(batch.column(0).null_count(), 1);
1636
1637        // This test fails if the max row group size is less than the batch's length
1638        // see https://github.com/apache/arrow-rs/issues/518
1639        roundtrip(batch, None);
1640    }
1641
1642    #[test]
1643    fn arrow_writer_list_non_null() {
1644        // define schema
1645        let schema = Schema::new(vec![Field::new(
1646            "a",
1647            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1648            false,
1649        )]);
1650
1651        // create some data
1652        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1653
1654        // Construct a buffer for value offsets, for the nested array:
1655        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1656        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1657
1658        // Construct a list array from the above two
1659        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1660            DataType::Int32,
1661            false,
1662        ))))
1663        .len(5)
1664        .add_buffer(a_value_offsets)
1665        .add_child_data(a_values.into_data())
1666        .build()
1667        .unwrap();
1668        let a = ListArray::from(a_list_data);
1669
1670        // build a record batch
1671        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1672
1673        // This test fails if the max row group size is less than the batch's length
1674        // see https://github.com/apache/arrow-rs/issues/518
1675        assert_eq!(batch.column(0).null_count(), 0);
1676
1677        roundtrip(batch, None);
1678    }
1679
1680    #[test]
1681    fn arrow_writer_binary() {
1682        let string_field = Field::new("a", DataType::Utf8, false);
1683        let binary_field = Field::new("b", DataType::Binary, false);
1684        let schema = Schema::new(vec![string_field, binary_field]);
1685
1686        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1687        let raw_binary_values = [
1688            b"foo".to_vec(),
1689            b"bar".to_vec(),
1690            b"baz".to_vec(),
1691            b"quux".to_vec(),
1692        ];
1693        let raw_binary_value_refs = raw_binary_values
1694            .iter()
1695            .map(|x| x.as_slice())
1696            .collect::<Vec<_>>();
1697
1698        let string_values = StringArray::from(raw_string_values.clone());
1699        let binary_values = BinaryArray::from(raw_binary_value_refs);
1700        let batch = RecordBatch::try_new(
1701            Arc::new(schema),
1702            vec![Arc::new(string_values), Arc::new(binary_values)],
1703        )
1704        .unwrap();
1705
1706        roundtrip(batch, Some(SMALL_SIZE / 2));
1707    }
1708
1709    #[test]
1710    fn arrow_writer_binary_view() {
1711        let string_field = Field::new("a", DataType::Utf8View, false);
1712        let binary_field = Field::new("b", DataType::BinaryView, false);
1713        let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1714        let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1715
1716        let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1717        let raw_binary_values = vec![
1718            b"foo".to_vec(),
1719            b"bar".to_vec(),
1720            b"large payload over 12 bytes".to_vec(),
1721            b"lulu".to_vec(),
1722        ];
1723        let nullable_string_values =
1724            vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1725
1726        let string_view_values = StringViewArray::from(raw_string_values);
1727        let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1728        let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1729        let batch = RecordBatch::try_new(
1730            Arc::new(schema),
1731            vec![
1732                Arc::new(string_view_values),
1733                Arc::new(binary_view_values),
1734                Arc::new(nullable_string_view_values),
1735            ],
1736        )
1737        .unwrap();
1738
1739        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1740        roundtrip(batch, None);
1741    }
1742
1743    fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1744        let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1745        let schema = Schema::new(vec![decimal_field]);
1746
1747        let decimal_values = vec![10_000, 50_000, 0, -100]
1748            .into_iter()
1749            .map(Some)
1750            .collect::<Decimal128Array>()
1751            .with_precision_and_scale(precision, scale)
1752            .unwrap();
1753
1754        RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1755    }
1756
1757    #[test]
1758    fn arrow_writer_decimal() {
1759        // int32 to store the decimal value
1760        let batch_int32_decimal = get_decimal_batch(5, 2);
1761        roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1762        // int64 to store the decimal value
1763        let batch_int64_decimal = get_decimal_batch(12, 2);
1764        roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1765        // fixed_length_byte_array to store the decimal value
1766        let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1767        roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1768    }
1769
1770    #[test]
1771    fn arrow_writer_complex() {
1772        // define schema
1773        let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1774        let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1775        let struct_field_g = Arc::new(Field::new_list(
1776            "g",
1777            Field::new_list_field(DataType::Int16, true),
1778            false,
1779        ));
1780        let struct_field_h = Arc::new(Field::new_list(
1781            "h",
1782            Field::new_list_field(DataType::Int16, false),
1783            true,
1784        ));
1785        let struct_field_e = Arc::new(Field::new_struct(
1786            "e",
1787            vec![
1788                struct_field_f.clone(),
1789                struct_field_g.clone(),
1790                struct_field_h.clone(),
1791            ],
1792            false,
1793        ));
1794        let schema = Schema::new(vec![
1795            Field::new("a", DataType::Int32, false),
1796            Field::new("b", DataType::Int32, true),
1797            Field::new_struct(
1798                "c",
1799                vec![struct_field_d.clone(), struct_field_e.clone()],
1800                false,
1801            ),
1802        ]);
1803
1804        // create some data
1805        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1806        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1807        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1808        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1809
1810        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1811
1812        // Construct a buffer for value offsets, for the nested array:
1813        //  [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
1814        let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1815
1816        // Construct a list array from the above two
1817        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1818            .len(5)
1819            .add_buffer(g_value_offsets.clone())
1820            .add_child_data(g_value.to_data())
1821            .build()
1822            .unwrap();
1823        let g = ListArray::from(g_list_data);
1824        // The difference between g and h is that h has a null bitmap
1825        let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1826            .len(5)
1827            .add_buffer(g_value_offsets)
1828            .add_child_data(g_value.to_data())
1829            .null_bit_buffer(Some(Buffer::from([0b00011011])))
1830            .build()
1831            .unwrap();
1832        let h = ListArray::from(h_list_data);
1833
1834        let e = StructArray::from(vec![
1835            (struct_field_f, Arc::new(f) as ArrayRef),
1836            (struct_field_g, Arc::new(g) as ArrayRef),
1837            (struct_field_h, Arc::new(h) as ArrayRef),
1838        ]);
1839
1840        let c = StructArray::from(vec![
1841            (struct_field_d, Arc::new(d) as ArrayRef),
1842            (struct_field_e, Arc::new(e) as ArrayRef),
1843        ]);
1844
1845        // build a record batch
1846        let batch = RecordBatch::try_new(
1847            Arc::new(schema),
1848            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1849        )
1850        .unwrap();
1851
1852        roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1853        roundtrip(batch, Some(SMALL_SIZE / 3));
1854    }
1855
1856    #[test]
1857    fn arrow_writer_complex_mixed() {
1858        // This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
1859        // It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.
1860
1861        // define schema
1862        let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1863        let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1864        let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1865        let schema = Schema::new(vec![Field::new(
1866            "some_nested_object",
1867            DataType::Struct(Fields::from(vec![
1868                offset_field.clone(),
1869                partition_field.clone(),
1870                topic_field.clone(),
1871            ])),
1872            false,
1873        )]);
1874
1875        // create some data
1876        let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1877        let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1878        let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1879
1880        let some_nested_object = StructArray::from(vec![
1881            (offset_field, Arc::new(offset) as ArrayRef),
1882            (partition_field, Arc::new(partition) as ArrayRef),
1883            (topic_field, Arc::new(topic) as ArrayRef),
1884        ]);
1885
1886        // build a record batch
1887        let batch =
1888            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1889
1890        roundtrip(batch, Some(SMALL_SIZE / 2));
1891    }
1892
1893    #[test]
1894    fn arrow_writer_map() {
1895        // Note: we are using the JSON Arrow reader for brevity
1896        let json_content = r#"
1897        {"stocks":{"long": "$AAA", "short": "$BBB"}}
1898        {"stocks":{"long": null, "long": "$CCC", "short": null}}
1899        {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1900        "#;
1901        let entries_struct_type = DataType::Struct(Fields::from(vec![
1902            Field::new("key", DataType::Utf8, false),
1903            Field::new("value", DataType::Utf8, true),
1904        ]));
1905        let stocks_field = Field::new(
1906            "stocks",
1907            DataType::Map(
1908                Arc::new(Field::new("entries", entries_struct_type, false)),
1909                false,
1910            ),
1911            true,
1912        );
1913        let schema = Arc::new(Schema::new(vec![stocks_field]));
1914        let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1915        let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1916
1917        let batch = reader.next().unwrap().unwrap();
1918        roundtrip(batch, None);
1919    }
1920
1921    #[test]
1922    fn arrow_writer_2_level_struct() {
1923        // tests writing <struct<struct<primitive>>
1924        let field_c = Field::new("c", DataType::Int32, true);
1925        let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1926        let type_a = DataType::Struct(vec![field_b.clone()].into());
1927        let field_a = Field::new("a", type_a, true);
1928        let schema = Schema::new(vec![field_a.clone()]);
1929
1930        // create data
1931        let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1932        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1933            .len(6)
1934            .null_bit_buffer(Some(Buffer::from([0b00100111])))
1935            .add_child_data(c.into_data())
1936            .build()
1937            .unwrap();
1938        let b = StructArray::from(b_data);
1939        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1940            .len(6)
1941            .null_bit_buffer(Some(Buffer::from([0b00101111])))
1942            .add_child_data(b.into_data())
1943            .build()
1944            .unwrap();
1945        let a = StructArray::from(a_data);
1946
1947        assert_eq!(a.null_count(), 1);
1948        assert_eq!(a.column(0).null_count(), 2);
1949
1950        // build a racord batch
1951        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1952
1953        roundtrip(batch, Some(SMALL_SIZE / 2));
1954    }
1955
1956    #[test]
1957    fn arrow_writer_2_level_struct_non_null() {
1958        // tests writing <struct<struct<primitive>>
1959        let field_c = Field::new("c", DataType::Int32, false);
1960        let type_b = DataType::Struct(vec![field_c].into());
1961        let field_b = Field::new("b", type_b.clone(), false);
1962        let type_a = DataType::Struct(vec![field_b].into());
1963        let field_a = Field::new("a", type_a.clone(), false);
1964        let schema = Schema::new(vec![field_a]);
1965
1966        // create data
1967        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1968        let b_data = ArrayDataBuilder::new(type_b)
1969            .len(6)
1970            .add_child_data(c.into_data())
1971            .build()
1972            .unwrap();
1973        let b = StructArray::from(b_data);
1974        let a_data = ArrayDataBuilder::new(type_a)
1975            .len(6)
1976            .add_child_data(b.into_data())
1977            .build()
1978            .unwrap();
1979        let a = StructArray::from(a_data);
1980
1981        assert_eq!(a.null_count(), 0);
1982        assert_eq!(a.column(0).null_count(), 0);
1983
1984        // build a racord batch
1985        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1986
1987        roundtrip(batch, Some(SMALL_SIZE / 2));
1988    }
1989
1990    #[test]
1991    fn arrow_writer_2_level_struct_mixed_null() {
1992        // tests writing <struct<struct<primitive>>
1993        let field_c = Field::new("c", DataType::Int32, false);
1994        let type_b = DataType::Struct(vec![field_c].into());
1995        let field_b = Field::new("b", type_b.clone(), true);
1996        let type_a = DataType::Struct(vec![field_b].into());
1997        let field_a = Field::new("a", type_a.clone(), false);
1998        let schema = Schema::new(vec![field_a]);
1999
2000        // create data
2001        let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2002        let b_data = ArrayDataBuilder::new(type_b)
2003            .len(6)
2004            .null_bit_buffer(Some(Buffer::from([0b00100111])))
2005            .add_child_data(c.into_data())
2006            .build()
2007            .unwrap();
2008        let b = StructArray::from(b_data);
2009        // a intentionally has no null buffer, to test that this is handled correctly
2010        let a_data = ArrayDataBuilder::new(type_a)
2011            .len(6)
2012            .add_child_data(b.into_data())
2013            .build()
2014            .unwrap();
2015        let a = StructArray::from(a_data);
2016
2017        assert_eq!(a.null_count(), 0);
2018        assert_eq!(a.column(0).null_count(), 2);
2019
2020        // build a racord batch
2021        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2022
2023        roundtrip(batch, Some(SMALL_SIZE / 2));
2024    }
2025
2026    #[test]
2027    fn arrow_writer_2_level_struct_mixed_null_2() {
2028        // tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
2029        let field_c = Field::new("c", DataType::Int32, false);
2030        let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2031        let field_e = Field::new(
2032            "e",
2033            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2034            false,
2035        );
2036
2037        let field_b = Field::new(
2038            "b",
2039            DataType::Struct(vec![field_c, field_d, field_e].into()),
2040            false,
2041        );
2042        let type_a = DataType::Struct(vec![field_b.clone()].into());
2043        let field_a = Field::new("a", type_a, true);
2044        let schema = Schema::new(vec![field_a.clone()]);
2045
2046        // create data
2047        let c = Int32Array::from_iter_values(0..6);
2048        let d = FixedSizeBinaryArray::try_from_iter(
2049            ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2050        )
2051        .expect("four byte values");
2052        let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2053        let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2054            .len(6)
2055            .add_child_data(c.into_data())
2056            .add_child_data(d.into_data())
2057            .add_child_data(e.into_data())
2058            .build()
2059            .unwrap();
2060        let b = StructArray::from(b_data);
2061        let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2062            .len(6)
2063            .null_bit_buffer(Some(Buffer::from([0b00100101])))
2064            .add_child_data(b.into_data())
2065            .build()
2066            .unwrap();
2067        let a = StructArray::from(a_data);
2068
2069        assert_eq!(a.null_count(), 3);
2070        assert_eq!(a.column(0).null_count(), 0);
2071
2072        // build a record batch
2073        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2074
2075        roundtrip(batch, Some(SMALL_SIZE / 2));
2076    }
2077
2078    #[test]
2079    fn test_fixed_size_binary_in_dict() {
2080        fn test_fixed_size_binary_in_dict_inner<K>()
2081        where
2082            K: ArrowDictionaryKeyType,
2083            K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2084            <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2085        {
2086            let field = Field::new(
2087                "a",
2088                DataType::Dictionary(
2089                    Box::new(K::DATA_TYPE),
2090                    Box::new(DataType::FixedSizeBinary(4)),
2091                ),
2092                false,
2093            );
2094            let schema = Schema::new(vec![field]);
2095
2096            let keys: Vec<K::Native> = vec![
2097                K::Native::try_from(0u8).unwrap(),
2098                K::Native::try_from(0u8).unwrap(),
2099                K::Native::try_from(1u8).unwrap(),
2100            ];
2101            let keys = PrimitiveArray::<K>::from_iter_values(keys);
2102            let values = FixedSizeBinaryArray::try_from_iter(
2103                vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2104            )
2105            .unwrap();
2106
2107            let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2108            let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2109            roundtrip(batch, None);
2110        }
2111
2112        test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2113        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2114        test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2115        test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2116        test_fixed_size_binary_in_dict_inner::<Int8Type>();
2117        test_fixed_size_binary_in_dict_inner::<Int16Type>();
2118        test_fixed_size_binary_in_dict_inner::<Int32Type>();
2119        test_fixed_size_binary_in_dict_inner::<Int64Type>();
2120    }
2121
2122    #[test]
2123    fn test_empty_dict() {
2124        let struct_fields = Fields::from(vec![Field::new(
2125            "dict",
2126            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2127            false,
2128        )]);
2129
2130        let schema = Schema::new(vec![Field::new_struct(
2131            "struct",
2132            struct_fields.clone(),
2133            true,
2134        )]);
2135        let dictionary = Arc::new(DictionaryArray::new(
2136            Int32Array::new_null(5),
2137            Arc::new(StringArray::new_null(0)),
2138        ));
2139
2140        let s = StructArray::new(
2141            struct_fields,
2142            vec![dictionary],
2143            Some(NullBuffer::new_null(5)),
2144        );
2145
2146        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2147        roundtrip(batch, None);
2148    }
2149    #[test]
2150    fn arrow_writer_page_size() {
2151        let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2152
2153        let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2154
2155        // Generate an array of 10 unique 10 character string
2156        for i in 0..10 {
2157            let value = i
2158                .to_string()
2159                .repeat(10)
2160                .chars()
2161                .take(10)
2162                .collect::<String>();
2163
2164            builder.append_value(value);
2165        }
2166
2167        let array = Arc::new(builder.finish());
2168
2169        let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2170
2171        let file = tempfile::tempfile().unwrap();
2172
2173        // Set everything very low so we fallback to PLAIN encoding after the first row
2174        let props = WriterProperties::builder()
2175            .set_data_page_size_limit(1)
2176            .set_dictionary_page_size_limit(1)
2177            .set_write_batch_size(1)
2178            .build();
2179
2180        let mut writer =
2181            ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2182                .expect("Unable to write file");
2183        writer.write(&batch).unwrap();
2184        writer.close().unwrap();
2185
2186        let options = ReadOptionsBuilder::new().with_page_index().build();
2187        let reader =
2188            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2189
2190        let column = reader.metadata().row_group(0).columns();
2191
2192        assert_eq!(column.len(), 1);
2193
2194        // We should write one row before falling back to PLAIN encoding so there should still be a
2195        // dictionary page.
2196        assert!(
2197            column[0].dictionary_page_offset().is_some(),
2198            "Expected a dictionary page"
2199        );
2200
2201        assert!(reader.metadata().offset_index().is_some());
2202        let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2203
2204        let page_locations = offset_indexes[0].page_locations.clone();
2205
2206        // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
2207        // so we expect one dictionary encoded page and then a page per row thereafter.
2208        assert_eq!(
2209            page_locations.len(),
2210            10,
2211            "Expected 10 pages but got {page_locations:#?}"
2212        );
2213    }
2214
2215    #[test]
2216    fn arrow_writer_float_nans() {
2217        let f16_field = Field::new("a", DataType::Float16, false);
2218        let f32_field = Field::new("b", DataType::Float32, false);
2219        let f64_field = Field::new("c", DataType::Float64, false);
2220        let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2221
2222        let f16_values = (0..MEDIUM_SIZE)
2223            .map(|i| {
2224                Some(if i % 2 == 0 {
2225                    f16::NAN
2226                } else {
2227                    f16::from_f32(i as f32)
2228                })
2229            })
2230            .collect::<Float16Array>();
2231
2232        let f32_values = (0..MEDIUM_SIZE)
2233            .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2234            .collect::<Float32Array>();
2235
2236        let f64_values = (0..MEDIUM_SIZE)
2237            .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2238            .collect::<Float64Array>();
2239
2240        let batch = RecordBatch::try_new(
2241            Arc::new(schema),
2242            vec![
2243                Arc::new(f16_values),
2244                Arc::new(f32_values),
2245                Arc::new(f64_values),
2246            ],
2247        )
2248        .unwrap();
2249
2250        roundtrip(batch, None);
2251    }
2252
2253    const SMALL_SIZE: usize = 7;
2254    const MEDIUM_SIZE: usize = 63;
2255
2256    fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2257        let mut files = vec![];
2258        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2259            let mut props = WriterProperties::builder().set_writer_version(version);
2260
2261            if let Some(size) = max_row_group_size {
2262                props = props.set_max_row_group_size(size)
2263            }
2264
2265            let props = props.build();
2266            files.push(roundtrip_opts(&expected_batch, props))
2267        }
2268        files
2269    }
2270
2271    fn roundtrip_opts_with_array_validation<F>(
2272        expected_batch: &RecordBatch,
2273        props: WriterProperties,
2274        validate: F,
2275    ) -> File
2276    where
2277        F: Fn(&ArrayData, &ArrayData),
2278    {
2279        let file = tempfile::tempfile().unwrap();
2280
2281        let mut writer = ArrowWriter::try_new(
2282            file.try_clone().unwrap(),
2283            expected_batch.schema(),
2284            Some(props),
2285        )
2286        .expect("Unable to write file");
2287        writer.write(expected_batch).unwrap();
2288        writer.close().unwrap();
2289
2290        let mut record_batch_reader =
2291            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2292
2293        let actual_batch = record_batch_reader
2294            .next()
2295            .expect("No batch found")
2296            .expect("Unable to get batch");
2297
2298        assert_eq!(expected_batch.schema(), actual_batch.schema());
2299        assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2300        assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2301        for i in 0..expected_batch.num_columns() {
2302            let expected_data = expected_batch.column(i).to_data();
2303            let actual_data = actual_batch.column(i).to_data();
2304            validate(&expected_data, &actual_data);
2305        }
2306
2307        file
2308    }
2309
2310    fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2311        roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2312            a.validate_full().expect("valid expected data");
2313            b.validate_full().expect("valid actual data");
2314            assert_eq!(a, b)
2315        })
2316    }
2317
2318    struct RoundTripOptions {
2319        values: ArrayRef,
2320        schema: SchemaRef,
2321        bloom_filter: bool,
2322        bloom_filter_position: BloomFilterPosition,
2323    }
2324
2325    impl RoundTripOptions {
2326        fn new(values: ArrayRef, nullable: bool) -> Self {
2327            let data_type = values.data_type().clone();
2328            let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2329            Self {
2330                values,
2331                schema: Arc::new(schema),
2332                bloom_filter: false,
2333                bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2334            }
2335        }
2336    }
2337
2338    fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2339        one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2340    }
2341
2342    fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2343        let mut options = RoundTripOptions::new(values, false);
2344        options.schema = schema;
2345        one_column_roundtrip_with_options(options)
2346    }
2347
2348    fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2349        let RoundTripOptions {
2350            values,
2351            schema,
2352            bloom_filter,
2353            bloom_filter_position,
2354        } = options;
2355
2356        let encodings = match values.data_type() {
2357            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2358                vec![
2359                    Encoding::PLAIN,
2360                    Encoding::DELTA_BYTE_ARRAY,
2361                    Encoding::DELTA_LENGTH_BYTE_ARRAY,
2362                ]
2363            }
2364            DataType::Int64
2365            | DataType::Int32
2366            | DataType::Int16
2367            | DataType::Int8
2368            | DataType::UInt64
2369            | DataType::UInt32
2370            | DataType::UInt16
2371            | DataType::UInt8 => vec![
2372                Encoding::PLAIN,
2373                Encoding::DELTA_BINARY_PACKED,
2374                Encoding::BYTE_STREAM_SPLIT,
2375            ],
2376            DataType::Float32 | DataType::Float64 => {
2377                vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2378            }
2379            _ => vec![Encoding::PLAIN],
2380        };
2381
2382        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2383
2384        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2385
2386        let mut files = vec![];
2387        for dictionary_size in [0, 1, 1024] {
2388            for encoding in &encodings {
2389                for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2390                    for row_group_size in row_group_sizes {
2391                        let props = WriterProperties::builder()
2392                            .set_writer_version(version)
2393                            .set_max_row_group_size(row_group_size)
2394                            .set_dictionary_enabled(dictionary_size != 0)
2395                            .set_dictionary_page_size_limit(dictionary_size.max(1))
2396                            .set_encoding(*encoding)
2397                            .set_bloom_filter_enabled(bloom_filter)
2398                            .set_bloom_filter_position(bloom_filter_position)
2399                            .build();
2400
2401                        files.push(roundtrip_opts(&expected_batch, props))
2402                    }
2403                }
2404            }
2405        }
2406        files
2407    }
2408
2409    fn values_required<A, I>(iter: I) -> Vec<File>
2410    where
2411        A: From<Vec<I::Item>> + Array + 'static,
2412        I: IntoIterator,
2413    {
2414        let raw_values: Vec<_> = iter.into_iter().collect();
2415        let values = Arc::new(A::from(raw_values));
2416        one_column_roundtrip(values, false)
2417    }
2418
2419    fn values_optional<A, I>(iter: I) -> Vec<File>
2420    where
2421        A: From<Vec<Option<I::Item>>> + Array + 'static,
2422        I: IntoIterator,
2423    {
2424        let optional_raw_values: Vec<_> = iter
2425            .into_iter()
2426            .enumerate()
2427            .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2428            .collect();
2429        let optional_values = Arc::new(A::from(optional_raw_values));
2430        one_column_roundtrip(optional_values, true)
2431    }
2432
2433    fn required_and_optional<A, I>(iter: I)
2434    where
2435        A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2436        I: IntoIterator + Clone,
2437    {
2438        values_required::<A, I>(iter.clone());
2439        values_optional::<A, I>(iter);
2440    }
2441
2442    fn check_bloom_filter<T: AsBytes>(
2443        files: Vec<File>,
2444        file_column: String,
2445        positive_values: Vec<T>,
2446        negative_values: Vec<T>,
2447    ) {
2448        files.into_iter().take(1).for_each(|file| {
2449            let file_reader = SerializedFileReader::new_with_options(
2450                file,
2451                ReadOptionsBuilder::new()
2452                    .with_reader_properties(
2453                        ReaderProperties::builder()
2454                            .set_read_bloom_filter(true)
2455                            .build(),
2456                    )
2457                    .build(),
2458            )
2459            .expect("Unable to open file as Parquet");
2460            let metadata = file_reader.metadata();
2461
2462            // Gets bloom filters from all row groups.
2463            let mut bloom_filters: Vec<_> = vec![];
2464            for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2465                if let Some((column_index, _)) = row_group
2466                    .columns()
2467                    .iter()
2468                    .enumerate()
2469                    .find(|(_, column)| column.column_path().string() == file_column)
2470                {
2471                    let row_group_reader = file_reader
2472                        .get_row_group(ri)
2473                        .expect("Unable to read row group");
2474                    if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2475                        bloom_filters.push(sbbf.clone());
2476                    } else {
2477                        panic!("No bloom filter for column named {file_column} found");
2478                    }
2479                } else {
2480                    panic!("No column named {file_column} found");
2481                }
2482            }
2483
2484            positive_values.iter().for_each(|value| {
2485                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2486                assert!(
2487                    found.is_some(),
2488                    "{}",
2489                    format!("Value {:?} should be in bloom filter", value.as_bytes())
2490                );
2491            });
2492
2493            negative_values.iter().for_each(|value| {
2494                let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2495                assert!(
2496                    found.is_none(),
2497                    "{}",
2498                    format!("Value {:?} should not be in bloom filter", value.as_bytes())
2499                );
2500            });
2501        });
2502    }
2503
2504    #[test]
2505    fn all_null_primitive_single_column() {
2506        let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2507        one_column_roundtrip(values, true);
2508    }
2509    #[test]
2510    fn null_single_column() {
2511        let values = Arc::new(NullArray::new(SMALL_SIZE));
2512        one_column_roundtrip(values, true);
2513        // null arrays are always nullable, a test with non-nullable nulls fails
2514    }
2515
2516    #[test]
2517    fn bool_single_column() {
2518        required_and_optional::<BooleanArray, _>(
2519            [true, false].iter().cycle().copied().take(SMALL_SIZE),
2520        );
2521    }
2522
2523    #[test]
2524    fn bool_large_single_column() {
2525        let values = Arc::new(
2526            [None, Some(true), Some(false)]
2527                .iter()
2528                .cycle()
2529                .copied()
2530                .take(200_000)
2531                .collect::<BooleanArray>(),
2532        );
2533        let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2534        let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2535        let file = tempfile::tempfile().unwrap();
2536
2537        let mut writer =
2538            ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2539                .expect("Unable to write file");
2540        writer.write(&expected_batch).unwrap();
2541        writer.close().unwrap();
2542    }
2543
2544    #[test]
2545    fn check_page_offset_index_with_nan() {
2546        let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2547        let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2548        let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2549
2550        let mut out = Vec::with_capacity(1024);
2551        let mut writer =
2552            ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2553        writer.write(&batch).unwrap();
2554        let file_meta_data = writer.close().unwrap();
2555        for row_group in file_meta_data.row_groups {
2556            for column in row_group.columns {
2557                assert!(column.offset_index_offset.is_some());
2558                assert!(column.offset_index_length.is_some());
2559                assert!(column.column_index_offset.is_none());
2560                assert!(column.column_index_length.is_none());
2561            }
2562        }
2563    }
2564
2565    #[test]
2566    fn i8_single_column() {
2567        required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2568    }
2569
2570    #[test]
2571    fn i16_single_column() {
2572        required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2573    }
2574
2575    #[test]
2576    fn i32_single_column() {
2577        required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2578    }
2579
2580    #[test]
2581    fn i64_single_column() {
2582        required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2583    }
2584
2585    #[test]
2586    fn u8_single_column() {
2587        required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2588    }
2589
2590    #[test]
2591    fn u16_single_column() {
2592        required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2593    }
2594
2595    #[test]
2596    fn u32_single_column() {
2597        required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2598    }
2599
2600    #[test]
2601    fn u64_single_column() {
2602        required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2603    }
2604
2605    #[test]
2606    fn f32_single_column() {
2607        required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2608    }
2609
2610    #[test]
2611    fn f64_single_column() {
2612        required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2613    }
2614
2615    // The timestamp array types don't implement From<Vec<T>> because they need the timezone
2616    // argument, and they also doesn't support building from a Vec<Option<T>>, so call
2617    // one_column_roundtrip manually instead of calling required_and_optional for these tests.
2618
2619    #[test]
2620    fn timestamp_second_single_column() {
2621        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2622        let values = Arc::new(TimestampSecondArray::from(raw_values));
2623
2624        one_column_roundtrip(values, false);
2625    }
2626
2627    #[test]
2628    fn timestamp_millisecond_single_column() {
2629        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2630        let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2631
2632        one_column_roundtrip(values, false);
2633    }
2634
2635    #[test]
2636    fn timestamp_microsecond_single_column() {
2637        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2638        let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2639
2640        one_column_roundtrip(values, false);
2641    }
2642
2643    #[test]
2644    fn timestamp_nanosecond_single_column() {
2645        let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2646        let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2647
2648        one_column_roundtrip(values, false);
2649    }
2650
2651    #[test]
2652    fn date32_single_column() {
2653        required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2654    }
2655
2656    #[test]
2657    fn date64_single_column() {
2658        // Date64 must be a multiple of 86400000, see ARROW-10925
2659        required_and_optional::<Date64Array, _>(
2660            (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2661        );
2662    }
2663
2664    #[test]
2665    fn time32_second_single_column() {
2666        required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2667    }
2668
2669    #[test]
2670    fn time32_millisecond_single_column() {
2671        required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2672    }
2673
2674    #[test]
2675    fn time64_microsecond_single_column() {
2676        required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2677    }
2678
2679    #[test]
2680    fn time64_nanosecond_single_column() {
2681        required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2682    }
2683
2684    #[test]
2685    fn duration_second_single_column() {
2686        required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2687    }
2688
2689    #[test]
2690    fn duration_millisecond_single_column() {
2691        required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2692    }
2693
2694    #[test]
2695    fn duration_microsecond_single_column() {
2696        required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2697    }
2698
2699    #[test]
2700    fn duration_nanosecond_single_column() {
2701        required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2702    }
2703
2704    #[test]
2705    fn interval_year_month_single_column() {
2706        required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2707    }
2708
2709    #[test]
2710    fn interval_day_time_single_column() {
2711        required_and_optional::<IntervalDayTimeArray, _>(vec![
2712            IntervalDayTime::new(0, 1),
2713            IntervalDayTime::new(0, 3),
2714            IntervalDayTime::new(3, -2),
2715            IntervalDayTime::new(-200, 4),
2716        ]);
2717    }
2718
2719    #[test]
2720    #[should_panic(
2721        expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2722    )]
2723    fn interval_month_day_nano_single_column() {
2724        required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2725            IntervalMonthDayNano::new(0, 1, 5),
2726            IntervalMonthDayNano::new(0, 3, 2),
2727            IntervalMonthDayNano::new(3, -2, -5),
2728            IntervalMonthDayNano::new(-200, 4, -1),
2729        ]);
2730    }
2731
2732    #[test]
2733    fn binary_single_column() {
2734        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2735        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2736        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2737
2738        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2739        values_required::<BinaryArray, _>(many_vecs_iter);
2740    }
2741
2742    #[test]
2743    fn binary_view_single_column() {
2744        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2745        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2746        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2747
2748        // BinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2749        values_required::<BinaryViewArray, _>(many_vecs_iter);
2750    }
2751
2752    #[test]
2753    fn i32_column_bloom_filter_at_end() {
2754        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2755        let mut options = RoundTripOptions::new(array, false);
2756        options.bloom_filter = true;
2757        options.bloom_filter_position = BloomFilterPosition::End;
2758
2759        let files = one_column_roundtrip_with_options(options);
2760        check_bloom_filter(
2761            files,
2762            "col".to_string(),
2763            (0..SMALL_SIZE as i32).collect(),
2764            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2765        );
2766    }
2767
2768    #[test]
2769    fn i32_column_bloom_filter() {
2770        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2771        let mut options = RoundTripOptions::new(array, false);
2772        options.bloom_filter = true;
2773
2774        let files = one_column_roundtrip_with_options(options);
2775        check_bloom_filter(
2776            files,
2777            "col".to_string(),
2778            (0..SMALL_SIZE as i32).collect(),
2779            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2780        );
2781    }
2782
2783    #[test]
2784    fn binary_column_bloom_filter() {
2785        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2786        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2787        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2788
2789        let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2790        let mut options = RoundTripOptions::new(array, false);
2791        options.bloom_filter = true;
2792
2793        let files = one_column_roundtrip_with_options(options);
2794        check_bloom_filter(
2795            files,
2796            "col".to_string(),
2797            many_vecs,
2798            vec![vec![(SMALL_SIZE + 1) as u8]],
2799        );
2800    }
2801
2802    #[test]
2803    fn empty_string_null_column_bloom_filter() {
2804        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2805        let raw_strs = raw_values.iter().map(|s| s.as_str());
2806
2807        let array = Arc::new(StringArray::from_iter_values(raw_strs));
2808        let mut options = RoundTripOptions::new(array, false);
2809        options.bloom_filter = true;
2810
2811        let files = one_column_roundtrip_with_options(options);
2812
2813        let optional_raw_values: Vec<_> = raw_values
2814            .iter()
2815            .enumerate()
2816            .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2817            .collect();
2818        // For null slots, empty string should not be in bloom filter.
2819        check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2820    }
2821
2822    #[test]
2823    fn large_binary_single_column() {
2824        let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2825        let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2826        let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2827
2828        // LargeBinaryArrays can't be built from Vec<Option<&str>>, so only call `values_required`
2829        values_required::<LargeBinaryArray, _>(many_vecs_iter);
2830    }
2831
2832    #[test]
2833    fn fixed_size_binary_single_column() {
2834        let mut builder = FixedSizeBinaryBuilder::new(4);
2835        builder.append_value(b"0123").unwrap();
2836        builder.append_null();
2837        builder.append_value(b"8910").unwrap();
2838        builder.append_value(b"1112").unwrap();
2839        let array = Arc::new(builder.finish());
2840
2841        one_column_roundtrip(array, true);
2842    }
2843
2844    #[test]
2845    fn string_single_column() {
2846        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2847        let raw_strs = raw_values.iter().map(|s| s.as_str());
2848
2849        required_and_optional::<StringArray, _>(raw_strs);
2850    }
2851
2852    #[test]
2853    fn large_string_single_column() {
2854        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2855        let raw_strs = raw_values.iter().map(|s| s.as_str());
2856
2857        required_and_optional::<LargeStringArray, _>(raw_strs);
2858    }
2859
2860    #[test]
2861    fn string_view_single_column() {
2862        let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2863        let raw_strs = raw_values.iter().map(|s| s.as_str());
2864
2865        required_and_optional::<StringViewArray, _>(raw_strs);
2866    }
2867
2868    #[test]
2869    fn null_list_single_column() {
2870        let null_field = Field::new_list_field(DataType::Null, true);
2871        let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2872
2873        let schema = Schema::new(vec![list_field]);
2874
2875        // Build [[], null, [null, null]]
2876        let a_values = NullArray::new(2);
2877        let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2878        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2879            DataType::Null,
2880            true,
2881        ))))
2882        .len(3)
2883        .add_buffer(a_value_offsets)
2884        .null_bit_buffer(Some(Buffer::from([0b00000101])))
2885        .add_child_data(a_values.into_data())
2886        .build()
2887        .unwrap();
2888
2889        let a = ListArray::from(a_list_data);
2890
2891        assert!(a.is_valid(0));
2892        assert!(!a.is_valid(1));
2893        assert!(a.is_valid(2));
2894
2895        assert_eq!(a.value(0).len(), 0);
2896        assert_eq!(a.value(2).len(), 2);
2897        assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2898
2899        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2900        roundtrip(batch, None);
2901    }
2902
2903    #[test]
2904    fn list_single_column() {
2905        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2906        let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2907        let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2908            DataType::Int32,
2909            false,
2910        ))))
2911        .len(5)
2912        .add_buffer(a_value_offsets)
2913        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2914        .add_child_data(a_values.into_data())
2915        .build()
2916        .unwrap();
2917
2918        assert_eq!(a_list_data.null_count(), 1);
2919
2920        let a = ListArray::from(a_list_data);
2921        let values = Arc::new(a);
2922
2923        one_column_roundtrip(values, true);
2924    }
2925
2926    #[test]
2927    fn large_list_single_column() {
2928        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2929        let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2930        let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2931            "large_item",
2932            DataType::Int32,
2933            true,
2934        ))))
2935        .len(5)
2936        .add_buffer(a_value_offsets)
2937        .add_child_data(a_values.into_data())
2938        .null_bit_buffer(Some(Buffer::from([0b00011011])))
2939        .build()
2940        .unwrap();
2941
2942        // I think this setup is incorrect because this should pass
2943        assert_eq!(a_list_data.null_count(), 1);
2944
2945        let a = LargeListArray::from(a_list_data);
2946        let values = Arc::new(a);
2947
2948        one_column_roundtrip(values, true);
2949    }
2950
2951    #[test]
2952    fn list_nested_nulls() {
2953        use arrow::datatypes::Int32Type;
2954        let data = vec![
2955            Some(vec![Some(1)]),
2956            Some(vec![Some(2), Some(3)]),
2957            None,
2958            Some(vec![Some(4), Some(5), None]),
2959            Some(vec![None]),
2960            Some(vec![Some(6), Some(7)]),
2961        ];
2962
2963        let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2964        one_column_roundtrip(Arc::new(list), true);
2965
2966        let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2967        one_column_roundtrip(Arc::new(list), true);
2968    }
2969
2970    #[test]
2971    fn struct_single_column() {
2972        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2973        let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2974        let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2975
2976        let values = Arc::new(s);
2977        one_column_roundtrip(values, false);
2978    }
2979
2980    #[test]
2981    fn list_and_map_coerced_names() {
2982        // Create map and list with non-Parquet naming
2983        let list_field =
2984            Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2985        let map_field = Field::new_map(
2986            "my_map",
2987            "entries",
2988            Field::new("keys", DataType::Int32, false),
2989            Field::new("values", DataType::Int32, true),
2990            false,
2991            true,
2992        );
2993
2994        let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2995        let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2996
2997        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2998
2999        // Write data to Parquet but coerce names to match spec
3000        let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3001        let file = tempfile::tempfile().unwrap();
3002        let mut writer =
3003            ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3004
3005        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3006        writer.write(&batch).unwrap();
3007        let file_metadata = writer.close().unwrap();
3008
3009        // Coerced name of "item" should be "element"
3010        assert_eq!(file_metadata.schema[3].name, "element");
3011        // Coerced name of "entries" should be "key_value"
3012        assert_eq!(file_metadata.schema[5].name, "key_value");
3013        // Coerced name of "keys" should be "key"
3014        assert_eq!(file_metadata.schema[6].name, "key");
3015        // Coerced name of "values" should be "value"
3016        assert_eq!(file_metadata.schema[7].name, "value");
3017
3018        // Double check schema after reading from the file
3019        let reader = SerializedFileReader::new(file).unwrap();
3020        let file_schema = reader.metadata().file_metadata().schema();
3021        let fields = file_schema.get_fields();
3022        let list_field = &fields[0].get_fields()[0];
3023        assert_eq!(list_field.get_fields()[0].name(), "element");
3024        let map_field = &fields[1].get_fields()[0];
3025        assert_eq!(map_field.name(), "key_value");
3026        assert_eq!(map_field.get_fields()[0].name(), "key");
3027        assert_eq!(map_field.get_fields()[1].name(), "value");
3028    }
3029
3030    #[test]
3031    fn fallback_flush_data_page() {
3032        //tests if the Fallback::flush_data_page clears all buffers correctly
3033        let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3034        let values = Arc::new(StringArray::from(raw_values));
3035        let encodings = vec![
3036            Encoding::DELTA_BYTE_ARRAY,
3037            Encoding::DELTA_LENGTH_BYTE_ARRAY,
3038        ];
3039        let data_type = values.data_type().clone();
3040        let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3041        let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3042
3043        let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3044        let data_page_size_limit: usize = 32;
3045        let write_batch_size: usize = 16;
3046
3047        for encoding in &encodings {
3048            for row_group_size in row_group_sizes {
3049                let props = WriterProperties::builder()
3050                    .set_writer_version(WriterVersion::PARQUET_2_0)
3051                    .set_max_row_group_size(row_group_size)
3052                    .set_dictionary_enabled(false)
3053                    .set_encoding(*encoding)
3054                    .set_data_page_size_limit(data_page_size_limit)
3055                    .set_write_batch_size(write_batch_size)
3056                    .build();
3057
3058                roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3059                    let string_array_a = StringArray::from(a.clone());
3060                    let string_array_b = StringArray::from(b.clone());
3061                    let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3062                    let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3063                    assert_eq!(
3064                        vec_a, vec_b,
3065                        "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3066                    );
3067                });
3068            }
3069        }
3070    }
3071
3072    #[test]
3073    fn arrow_writer_string_dictionary() {
3074        // define schema
3075        #[allow(deprecated)]
3076        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3077            "dictionary",
3078            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3079            true,
3080            42,
3081            true,
3082        )]));
3083
3084        // create some data
3085        let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3086            .iter()
3087            .copied()
3088            .collect();
3089
3090        // build a record batch
3091        one_column_roundtrip_with_schema(Arc::new(d), schema);
3092    }
3093
3094    #[test]
3095    fn arrow_writer_dict_and_native_compatibility() {
3096        let schema = Arc::new(Schema::new(vec![Field::new(
3097            "a",
3098            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3099            false,
3100        )]));
3101
3102        let rb1 = RecordBatch::try_new(
3103            schema.clone(),
3104            vec![Arc::new(DictionaryArray::new(
3105                UInt8Array::from_iter_values(vec![0, 1, 0]),
3106                Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3107            ))],
3108        )
3109        .unwrap();
3110
3111        let file = tempfile().unwrap();
3112        let mut writer =
3113            ArrowWriter::try_new(file.try_clone().unwrap(), rb1.schema(), None).unwrap();
3114        writer.write(&rb1).unwrap();
3115
3116        // check can append another record batch where the field has the same type
3117        // as the dictionary values from the first batch
3118        let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
3119        let rb2 = RecordBatch::try_new(
3120            schema2,
3121            vec![Arc::new(StringArray::from_iter_values(vec![
3122                "barquet", "curious",
3123            ]))],
3124        )
3125        .unwrap();
3126        writer.write(&rb2).unwrap();
3127
3128        writer.close().unwrap();
3129
3130        let mut record_batch_reader =
3131            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3132        let actual_batch = record_batch_reader.next().unwrap().unwrap();
3133
3134        let expected_batch = RecordBatch::try_new(
3135            schema,
3136            vec![Arc::new(DictionaryArray::new(
3137                UInt8Array::from_iter_values(vec![0, 1, 0, 1, 2]),
3138                Arc::new(StringArray::from_iter_values(vec![
3139                    "parquet", "barquet", "curious",
3140                ])),
3141            ))],
3142        )
3143        .unwrap();
3144
3145        assert_eq!(actual_batch, expected_batch)
3146    }
3147
3148    #[test]
3149    fn arrow_writer_native_and_dict_compatibility() {
3150        let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
3151        let rb1 = RecordBatch::try_new(
3152            schema1.clone(),
3153            vec![Arc::new(StringArray::from_iter_values(vec![
3154                "parquet", "barquet",
3155            ]))],
3156        )
3157        .unwrap();
3158
3159        let file = tempfile().unwrap();
3160        let mut writer =
3161            ArrowWriter::try_new(file.try_clone().unwrap(), rb1.schema(), None).unwrap();
3162        writer.write(&rb1).unwrap();
3163
3164        let schema2 = Arc::new(Schema::new(vec![Field::new(
3165            "a",
3166            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3167            false,
3168        )]));
3169
3170        let rb2 = RecordBatch::try_new(
3171            schema2.clone(),
3172            vec![Arc::new(DictionaryArray::new(
3173                UInt8Array::from_iter_values(vec![0, 1, 0]),
3174                Arc::new(StringArray::from_iter_values(vec!["barquet", "curious"])),
3175            ))],
3176        )
3177        .unwrap();
3178        writer.write(&rb2).unwrap();
3179
3180        writer.close().unwrap();
3181
3182        let mut record_batch_reader =
3183            ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3184        let actual_batch = record_batch_reader.next().unwrap().unwrap();
3185
3186        let expected_batch = RecordBatch::try_new(
3187            schema1,
3188            vec![Arc::new(StringArray::from_iter_values(vec![
3189                "parquet", "barquet", "barquet", "curious", "barquet",
3190            ]))],
3191        )
3192        .unwrap();
3193
3194        assert_eq!(actual_batch, expected_batch)
3195    }
3196
3197    #[test]
3198    fn arrow_writer_primitive_dictionary() {
3199        // define schema
3200        #[allow(deprecated)]
3201        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3202            "dictionary",
3203            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3204            true,
3205            42,
3206            true,
3207        )]));
3208
3209        // create some data
3210        let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3211        builder.append(12345678).unwrap();
3212        builder.append_null();
3213        builder.append(22345678).unwrap();
3214        builder.append(12345678).unwrap();
3215        let d = builder.finish();
3216
3217        one_column_roundtrip_with_schema(Arc::new(d), schema);
3218    }
3219
3220    #[test]
3221    fn arrow_writer_decimal32_dictionary() {
3222        let integers = vec![12345, 56789, 34567];
3223
3224        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3225
3226        let values = Decimal32Array::from(integers.clone())
3227            .with_precision_and_scale(5, 2)
3228            .unwrap();
3229
3230        let array = DictionaryArray::new(keys, Arc::new(values));
3231        one_column_roundtrip(Arc::new(array.clone()), true);
3232
3233        let values = Decimal32Array::from(integers)
3234            .with_precision_and_scale(9, 2)
3235            .unwrap();
3236
3237        let array = array.with_values(Arc::new(values));
3238        one_column_roundtrip(Arc::new(array), true);
3239    }
3240
3241    #[test]
3242    fn arrow_writer_decimal64_dictionary() {
3243        let integers = vec![12345, 56789, 34567];
3244
3245        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3246
3247        let values = Decimal64Array::from(integers.clone())
3248            .with_precision_and_scale(5, 2)
3249            .unwrap();
3250
3251        let array = DictionaryArray::new(keys, Arc::new(values));
3252        one_column_roundtrip(Arc::new(array.clone()), true);
3253
3254        let values = Decimal64Array::from(integers)
3255            .with_precision_and_scale(12, 2)
3256            .unwrap();
3257
3258        let array = array.with_values(Arc::new(values));
3259        one_column_roundtrip(Arc::new(array), true);
3260    }
3261
3262    #[test]
3263    fn arrow_writer_decimal128_dictionary() {
3264        let integers = vec![12345, 56789, 34567];
3265
3266        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3267
3268        let values = Decimal128Array::from(integers.clone())
3269            .with_precision_and_scale(5, 2)
3270            .unwrap();
3271
3272        let array = DictionaryArray::new(keys, Arc::new(values));
3273        one_column_roundtrip(Arc::new(array.clone()), true);
3274
3275        let values = Decimal128Array::from(integers)
3276            .with_precision_and_scale(12, 2)
3277            .unwrap();
3278
3279        let array = array.with_values(Arc::new(values));
3280        one_column_roundtrip(Arc::new(array), true);
3281    }
3282
3283    #[test]
3284    fn arrow_writer_decimal256_dictionary() {
3285        let integers = vec![
3286            i256::from_i128(12345),
3287            i256::from_i128(56789),
3288            i256::from_i128(34567),
3289        ];
3290
3291        let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3292
3293        let values = Decimal256Array::from(integers.clone())
3294            .with_precision_and_scale(5, 2)
3295            .unwrap();
3296
3297        let array = DictionaryArray::new(keys, Arc::new(values));
3298        one_column_roundtrip(Arc::new(array.clone()), true);
3299
3300        let values = Decimal256Array::from(integers)
3301            .with_precision_and_scale(12, 2)
3302            .unwrap();
3303
3304        let array = array.with_values(Arc::new(values));
3305        one_column_roundtrip(Arc::new(array), true);
3306    }
3307
3308    #[test]
3309    fn arrow_writer_string_dictionary_unsigned_index() {
3310        // define schema
3311        #[allow(deprecated)]
3312        let schema = Arc::new(Schema::new(vec![Field::new_dict(
3313            "dictionary",
3314            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3315            true,
3316            42,
3317            true,
3318        )]));
3319
3320        // create some data
3321        let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3322            .iter()
3323            .copied()
3324            .collect();
3325
3326        one_column_roundtrip_with_schema(Arc::new(d), schema);
3327    }
3328
3329    #[test]
3330    fn u32_min_max() {
3331        // check values roundtrip through parquet
3332        let src = [
3333            u32::MIN,
3334            u32::MIN + 1,
3335            (i32::MAX as u32) - 1,
3336            i32::MAX as u32,
3337            (i32::MAX as u32) + 1,
3338            u32::MAX - 1,
3339            u32::MAX,
3340        ];
3341        let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3342        let files = one_column_roundtrip(values, false);
3343
3344        for file in files {
3345            // check statistics are valid
3346            let reader = SerializedFileReader::new(file).unwrap();
3347            let metadata = reader.metadata();
3348
3349            let mut row_offset = 0;
3350            for row_group in metadata.row_groups() {
3351                assert_eq!(row_group.num_columns(), 1);
3352                let column = row_group.column(0);
3353
3354                let num_values = column.num_values() as usize;
3355                let src_slice = &src[row_offset..row_offset + num_values];
3356                row_offset += column.num_values() as usize;
3357
3358                let stats = column.statistics().unwrap();
3359                if let Statistics::Int32(stats) = stats {
3360                    assert_eq!(
3361                        *stats.min_opt().unwrap() as u32,
3362                        *src_slice.iter().min().unwrap()
3363                    );
3364                    assert_eq!(
3365                        *stats.max_opt().unwrap() as u32,
3366                        *src_slice.iter().max().unwrap()
3367                    );
3368                } else {
3369                    panic!("Statistics::Int32 missing")
3370                }
3371            }
3372        }
3373    }
3374
3375    #[test]
3376    fn u64_min_max() {
3377        // check values roundtrip through parquet
3378        let src = [
3379            u64::MIN,
3380            u64::MIN + 1,
3381            (i64::MAX as u64) - 1,
3382            i64::MAX as u64,
3383            (i64::MAX as u64) + 1,
3384            u64::MAX - 1,
3385            u64::MAX,
3386        ];
3387        let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3388        let files = one_column_roundtrip(values, false);
3389
3390        for file in files {
3391            // check statistics are valid
3392            let reader = SerializedFileReader::new(file).unwrap();
3393            let metadata = reader.metadata();
3394
3395            let mut row_offset = 0;
3396            for row_group in metadata.row_groups() {
3397                assert_eq!(row_group.num_columns(), 1);
3398                let column = row_group.column(0);
3399
3400                let num_values = column.num_values() as usize;
3401                let src_slice = &src[row_offset..row_offset + num_values];
3402                row_offset += column.num_values() as usize;
3403
3404                let stats = column.statistics().unwrap();
3405                if let Statistics::Int64(stats) = stats {
3406                    assert_eq!(
3407                        *stats.min_opt().unwrap() as u64,
3408                        *src_slice.iter().min().unwrap()
3409                    );
3410                    assert_eq!(
3411                        *stats.max_opt().unwrap() as u64,
3412                        *src_slice.iter().max().unwrap()
3413                    );
3414                } else {
3415                    panic!("Statistics::Int64 missing")
3416                }
3417            }
3418        }
3419    }
3420
3421    #[test]
3422    fn statistics_null_counts_only_nulls() {
3423        // check that null-count statistics for "only NULL"-columns are correct
3424        let values = Arc::new(UInt64Array::from(vec![None, None]));
3425        let files = one_column_roundtrip(values, true);
3426
3427        for file in files {
3428            // check statistics are valid
3429            let reader = SerializedFileReader::new(file).unwrap();
3430            let metadata = reader.metadata();
3431            assert_eq!(metadata.num_row_groups(), 1);
3432            let row_group = metadata.row_group(0);
3433            assert_eq!(row_group.num_columns(), 1);
3434            let column = row_group.column(0);
3435            let stats = column.statistics().unwrap();
3436            assert_eq!(stats.null_count_opt(), Some(2));
3437        }
3438    }
3439
3440    #[test]
3441    fn test_list_of_struct_roundtrip() {
3442        // define schema
3443        let int_field = Field::new("a", DataType::Int32, true);
3444        let int_field2 = Field::new("b", DataType::Int32, true);
3445
3446        let int_builder = Int32Builder::with_capacity(10);
3447        let int_builder2 = Int32Builder::with_capacity(10);
3448
3449        let struct_builder = StructBuilder::new(
3450            vec![int_field, int_field2],
3451            vec![Box::new(int_builder), Box::new(int_builder2)],
3452        );
3453        let mut list_builder = ListBuilder::new(struct_builder);
3454
3455        // Construct the following array
3456        // [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]
3457
3458        // [{a: 1, b: 2}]
3459        let values = list_builder.values();
3460        values
3461            .field_builder::<Int32Builder>(0)
3462            .unwrap()
3463            .append_value(1);
3464        values
3465            .field_builder::<Int32Builder>(1)
3466            .unwrap()
3467            .append_value(2);
3468        values.append(true);
3469        list_builder.append(true);
3470
3471        // []
3472        list_builder.append(true);
3473
3474        // null
3475        list_builder.append(false);
3476
3477        // [null, null]
3478        let values = list_builder.values();
3479        values
3480            .field_builder::<Int32Builder>(0)
3481            .unwrap()
3482            .append_null();
3483        values
3484            .field_builder::<Int32Builder>(1)
3485            .unwrap()
3486            .append_null();
3487        values.append(false);
3488        values
3489            .field_builder::<Int32Builder>(0)
3490            .unwrap()
3491            .append_null();
3492        values
3493            .field_builder::<Int32Builder>(1)
3494            .unwrap()
3495            .append_null();
3496        values.append(false);
3497        list_builder.append(true);
3498
3499        // [{a: null, b: 3}]
3500        let values = list_builder.values();
3501        values
3502            .field_builder::<Int32Builder>(0)
3503            .unwrap()
3504            .append_null();
3505        values
3506            .field_builder::<Int32Builder>(1)
3507            .unwrap()
3508            .append_value(3);
3509        values.append(true);
3510        list_builder.append(true);
3511
3512        // [{a: 2, b: null}]
3513        let values = list_builder.values();
3514        values
3515            .field_builder::<Int32Builder>(0)
3516            .unwrap()
3517            .append_value(2);
3518        values
3519            .field_builder::<Int32Builder>(1)
3520            .unwrap()
3521            .append_null();
3522        values.append(true);
3523        list_builder.append(true);
3524
3525        let array = Arc::new(list_builder.finish());
3526
3527        one_column_roundtrip(array, true);
3528    }
3529
3530    fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3531        metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3532    }
3533
3534    #[test]
3535    fn test_aggregates_records() {
3536        let arrays = [
3537            Int32Array::from((0..100).collect::<Vec<_>>()),
3538            Int32Array::from((0..50).collect::<Vec<_>>()),
3539            Int32Array::from((200..500).collect::<Vec<_>>()),
3540        ];
3541
3542        let schema = Arc::new(Schema::new(vec![Field::new(
3543            "int",
3544            ArrowDataType::Int32,
3545            false,
3546        )]));
3547
3548        let file = tempfile::tempfile().unwrap();
3549
3550        let props = WriterProperties::builder()
3551            .set_max_row_group_size(200)
3552            .build();
3553
3554        let mut writer =
3555            ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3556
3557        for array in arrays {
3558            let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3559            writer.write(&batch).unwrap();
3560        }
3561
3562        writer.close().unwrap();
3563
3564        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3565        assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3566
3567        let batches = builder
3568            .with_batch_size(100)
3569            .build()
3570            .unwrap()
3571            .collect::<ArrowResult<Vec<_>>>()
3572            .unwrap();
3573
3574        assert_eq!(batches.len(), 5);
3575        assert!(batches.iter().all(|x| x.num_columns() == 1));
3576
3577        let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3578
3579        assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3580
3581        let values: Vec<_> = batches
3582            .iter()
3583            .flat_map(|x| {
3584                x.column(0)
3585                    .as_any()
3586                    .downcast_ref::<Int32Array>()
3587                    .unwrap()
3588                    .values()
3589                    .iter()
3590                    .cloned()
3591            })
3592            .collect();
3593
3594        let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3595        assert_eq!(&values, &expected_values)
3596    }
3597
3598    #[test]
3599    fn complex_aggregate() {
3600        // Tests aggregating nested data
3601        let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3602        let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3603        let struct_a = Arc::new(Field::new(
3604            "struct_a",
3605            DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3606            true,
3607        ));
3608
3609        let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3610        let struct_b = Arc::new(Field::new(
3611            "struct_b",
3612            DataType::Struct(vec![list_a.clone()].into()),
3613            false,
3614        ));
3615
3616        let schema = Arc::new(Schema::new(vec![struct_b]));
3617
3618        // create nested data
3619        let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3620        let field_b_array =
3621            Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3622
3623        let struct_a_array = StructArray::from(vec![
3624            (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3625            (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3626        ]);
3627
3628        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3629            .len(5)
3630            .add_buffer(Buffer::from_iter(vec![
3631                0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3632            ]))
3633            .null_bit_buffer(Some(Buffer::from_iter(vec![
3634                true, false, true, false, true,
3635            ])))
3636            .child_data(vec![struct_a_array.into_data()])
3637            .build()
3638            .unwrap();
3639
3640        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3641        let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3642
3643        let batch1 =
3644            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3645                .unwrap();
3646
3647        let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3648        let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3649
3650        let struct_a_array = StructArray::from(vec![
3651            (field_a, Arc::new(field_a_array) as ArrayRef),
3652            (field_b, Arc::new(field_b_array) as ArrayRef),
3653        ]);
3654
3655        let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3656            .len(2)
3657            .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3658            .child_data(vec![struct_a_array.into_data()])
3659            .build()
3660            .unwrap();
3661
3662        let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3663        let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3664
3665        let batch2 =
3666            RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3667                .unwrap();
3668
3669        let batches = &[batch1, batch2];
3670
3671        // Verify data is as expected
3672
3673        let expected = r#"
3674            +-------------------------------------------------------------------------------------------------------+
3675            | struct_b                                                                                              |
3676            +-------------------------------------------------------------------------------------------------------+
3677            | {list: [{leaf_a: 1, leaf_b: 1}]}                                                                      |
3678            | {list: }                                                                                              |
3679            | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]}                                               |
3680            | {list: }                                                                                              |
3681            | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]}                                                |
3682            | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3683            | {list: [{leaf_a: 10, leaf_b: }]}                                                                      |
3684            +-------------------------------------------------------------------------------------------------------+
3685        "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3686
3687        let actual = pretty_format_batches(batches).unwrap().to_string();
3688        assert_eq!(actual, expected);
3689
3690        // Write data
3691        let file = tempfile::tempfile().unwrap();
3692        let props = WriterProperties::builder()
3693            .set_max_row_group_size(6)
3694            .build();
3695
3696        let mut writer =
3697            ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3698
3699        for batch in batches {
3700            writer.write(batch).unwrap();
3701        }
3702        writer.close().unwrap();
3703
3704        // Read Data
3705        // Should have written entire first batch and first row of second to the first row group
3706        // leaving a single row in the second row group
3707
3708        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3709        assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3710
3711        let batches = builder
3712            .with_batch_size(2)
3713            .build()
3714            .unwrap()
3715            .collect::<ArrowResult<Vec<_>>>()
3716            .unwrap();
3717
3718        assert_eq!(batches.len(), 4);
3719        let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3720        assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3721
3722        let actual = pretty_format_batches(&batches).unwrap().to_string();
3723        assert_eq!(actual, expected);
3724    }
3725
3726    #[test]
3727    fn test_arrow_writer_metadata() {
3728        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3729        let file_schema = batch_schema.clone().with_metadata(
3730            vec![("foo".to_string(), "bar".to_string())]
3731                .into_iter()
3732                .collect(),
3733        );
3734
3735        let batch = RecordBatch::try_new(
3736            Arc::new(batch_schema),
3737            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3738        )
3739        .unwrap();
3740
3741        let mut buf = Vec::with_capacity(1024);
3742        let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3743        writer.write(&batch).unwrap();
3744        writer.close().unwrap();
3745    }
3746
3747    #[test]
3748    fn test_arrow_writer_nullable() {
3749        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3750        let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3751        let file_schema = Arc::new(file_schema);
3752
3753        let batch = RecordBatch::try_new(
3754            Arc::new(batch_schema),
3755            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3756        )
3757        .unwrap();
3758
3759        let mut buf = Vec::with_capacity(1024);
3760        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3761        writer.write(&batch).unwrap();
3762        writer.close().unwrap();
3763
3764        let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3765        let back = read.next().unwrap().unwrap();
3766        assert_eq!(back.schema(), file_schema);
3767        assert_ne!(back.schema(), batch.schema());
3768        assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3769    }
3770
3771    #[test]
3772    fn in_progress_accounting() {
3773        // define schema
3774        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3775
3776        // create some data
3777        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3778
3779        // build a record batch
3780        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3781
3782        let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3783
3784        // starts empty
3785        assert_eq!(writer.in_progress_size(), 0);
3786        assert_eq!(writer.in_progress_rows(), 0);
3787        assert_eq!(writer.memory_size(), 0);
3788        assert_eq!(writer.bytes_written(), 4); // Initial header
3789        writer.write(&batch).unwrap();
3790
3791        // updated on write
3792        let initial_size = writer.in_progress_size();
3793        assert!(initial_size > 0);
3794        assert_eq!(writer.in_progress_rows(), 5);
3795        let initial_memory = writer.memory_size();
3796        assert!(initial_memory > 0);
3797        // memory estimate is larger than estimated encoded size
3798        assert!(
3799            initial_size <= initial_memory,
3800            "{initial_size} <= {initial_memory}"
3801        );
3802
3803        // updated on second write
3804        writer.write(&batch).unwrap();
3805        assert!(writer.in_progress_size() > initial_size);
3806        assert_eq!(writer.in_progress_rows(), 10);
3807        assert!(writer.memory_size() > initial_memory);
3808        assert!(
3809            writer.in_progress_size() <= writer.memory_size(),
3810            "in_progress_size {} <= memory_size {}",
3811            writer.in_progress_size(),
3812            writer.memory_size()
3813        );
3814
3815        // in progress tracking is cleared, but the overall data written is updated
3816        let pre_flush_bytes_written = writer.bytes_written();
3817        writer.flush().unwrap();
3818        assert_eq!(writer.in_progress_size(), 0);
3819        assert_eq!(writer.memory_size(), 0);
3820        assert!(writer.bytes_written() > pre_flush_bytes_written);
3821
3822        writer.close().unwrap();
3823    }
3824
3825    #[test]
3826    fn test_writer_all_null() {
3827        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3828        let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3829        let batch = RecordBatch::try_from_iter(vec![
3830            ("a", Arc::new(a) as ArrayRef),
3831            ("b", Arc::new(b) as ArrayRef),
3832        ])
3833        .unwrap();
3834
3835        let mut buf = Vec::with_capacity(1024);
3836        let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3837        writer.write(&batch).unwrap();
3838        writer.close().unwrap();
3839
3840        let bytes = Bytes::from(buf);
3841        let options = ReadOptionsBuilder::new().with_page_index().build();
3842        let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3843        let index = reader.metadata().offset_index().unwrap();
3844
3845        assert_eq!(index.len(), 1);
3846        assert_eq!(index[0].len(), 2); // 2 columns
3847        assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3848        assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
3849    }
3850
3851    #[test]
3852    fn test_disabled_statistics_with_page() {
3853        let file_schema = Schema::new(vec![
3854            Field::new("a", DataType::Utf8, true),
3855            Field::new("b", DataType::Utf8, true),
3856        ]);
3857        let file_schema = Arc::new(file_schema);
3858
3859        let batch = RecordBatch::try_new(
3860            file_schema.clone(),
3861            vec![
3862                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3863                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3864            ],
3865        )
3866        .unwrap();
3867
3868        let props = WriterProperties::builder()
3869            .set_statistics_enabled(EnabledStatistics::None)
3870            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3871            .build();
3872
3873        let mut buf = Vec::with_capacity(1024);
3874        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3875        writer.write(&batch).unwrap();
3876
3877        let metadata = writer.close().unwrap();
3878        assert_eq!(metadata.row_groups.len(), 1);
3879        let row_group = &metadata.row_groups[0];
3880        assert_eq!(row_group.columns.len(), 2);
3881        // Column "a" has both offset and column index, as requested
3882        assert!(row_group.columns[0].offset_index_offset.is_some());
3883        assert!(row_group.columns[0].column_index_offset.is_some());
3884        // Column "b" should only have offset index
3885        assert!(row_group.columns[1].offset_index_offset.is_some());
3886        assert!(row_group.columns[1].column_index_offset.is_none());
3887
3888        let options = ReadOptionsBuilder::new().with_page_index().build();
3889        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3890
3891        let row_group = reader.get_row_group(0).unwrap();
3892        let a_col = row_group.metadata().column(0);
3893        let b_col = row_group.metadata().column(1);
3894
3895        // Column chunk of column "a" should have chunk level statistics
3896        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3897            let min = byte_array_stats.min_opt().unwrap();
3898            let max = byte_array_stats.max_opt().unwrap();
3899
3900            assert_eq!(min.as_bytes(), b"a");
3901            assert_eq!(max.as_bytes(), b"d");
3902        } else {
3903            panic!("expecting Statistics::ByteArray");
3904        }
3905
3906        // The column chunk for column "b" shouldn't have statistics
3907        assert!(b_col.statistics().is_none());
3908
3909        let offset_index = reader.metadata().offset_index().unwrap();
3910        assert_eq!(offset_index.len(), 1); // 1 row group
3911        assert_eq!(offset_index[0].len(), 2); // 2 columns
3912
3913        let column_index = reader.metadata().column_index().unwrap();
3914        assert_eq!(column_index.len(), 1); // 1 row group
3915        assert_eq!(column_index[0].len(), 2); // 2 columns
3916
3917        let a_idx = &column_index[0][0];
3918        assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3919        let b_idx = &column_index[0][1];
3920        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3921    }
3922
3923    #[test]
3924    fn test_disabled_statistics_with_chunk() {
3925        let file_schema = Schema::new(vec![
3926            Field::new("a", DataType::Utf8, true),
3927            Field::new("b", DataType::Utf8, true),
3928        ]);
3929        let file_schema = Arc::new(file_schema);
3930
3931        let batch = RecordBatch::try_new(
3932            file_schema.clone(),
3933            vec![
3934                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3935                Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3936            ],
3937        )
3938        .unwrap();
3939
3940        let props = WriterProperties::builder()
3941            .set_statistics_enabled(EnabledStatistics::None)
3942            .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3943            .build();
3944
3945        let mut buf = Vec::with_capacity(1024);
3946        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3947        writer.write(&batch).unwrap();
3948
3949        let metadata = writer.close().unwrap();
3950        assert_eq!(metadata.row_groups.len(), 1);
3951        let row_group = &metadata.row_groups[0];
3952        assert_eq!(row_group.columns.len(), 2);
3953        // Column "a" should only have offset index
3954        assert!(row_group.columns[0].offset_index_offset.is_some());
3955        assert!(row_group.columns[0].column_index_offset.is_none());
3956        // Column "b" should only have offset index
3957        assert!(row_group.columns[1].offset_index_offset.is_some());
3958        assert!(row_group.columns[1].column_index_offset.is_none());
3959
3960        let options = ReadOptionsBuilder::new().with_page_index().build();
3961        let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3962
3963        let row_group = reader.get_row_group(0).unwrap();
3964        let a_col = row_group.metadata().column(0);
3965        let b_col = row_group.metadata().column(1);
3966
3967        // Column chunk of column "a" should have chunk level statistics
3968        if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3969            let min = byte_array_stats.min_opt().unwrap();
3970            let max = byte_array_stats.max_opt().unwrap();
3971
3972            assert_eq!(min.as_bytes(), b"a");
3973            assert_eq!(max.as_bytes(), b"d");
3974        } else {
3975            panic!("expecting Statistics::ByteArray");
3976        }
3977
3978        // The column chunk for column "b"  shouldn't have statistics
3979        assert!(b_col.statistics().is_none());
3980
3981        let column_index = reader.metadata().column_index().unwrap();
3982        assert_eq!(column_index.len(), 1); // 1 row group
3983        assert_eq!(column_index[0].len(), 2); // 2 columns
3984
3985        let a_idx = &column_index[0][0];
3986        assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3987        let b_idx = &column_index[0][1];
3988        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3989    }
3990
3991    #[test]
3992    fn test_arrow_writer_skip_metadata() {
3993        let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3994        let file_schema = Arc::new(batch_schema.clone());
3995
3996        let batch = RecordBatch::try_new(
3997            Arc::new(batch_schema),
3998            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3999        )
4000        .unwrap();
4001        let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4002
4003        let mut buf = Vec::with_capacity(1024);
4004        let mut writer =
4005            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4006        writer.write(&batch).unwrap();
4007        writer.close().unwrap();
4008
4009        let bytes = Bytes::from(buf);
4010        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4011        assert_eq!(file_schema, *reader_builder.schema());
4012        if let Some(key_value_metadata) = reader_builder
4013            .metadata()
4014            .file_metadata()
4015            .key_value_metadata()
4016        {
4017            assert!(!key_value_metadata
4018                .iter()
4019                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
4020        }
4021    }
4022
4023    #[test]
4024    fn mismatched_schemas() {
4025        let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4026        let file_schema = Arc::new(Schema::new(vec![Field::new(
4027            "temperature",
4028            DataType::Float64,
4029            false,
4030        )]));
4031
4032        let batch = RecordBatch::try_new(
4033            Arc::new(batch_schema),
4034            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4035        )
4036        .unwrap();
4037
4038        let mut buf = Vec::with_capacity(1024);
4039        let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4040
4041        let err = writer.write(&batch).unwrap_err().to_string();
4042        assert_eq!(
4043            err,
4044            "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4045        );
4046    }
4047
4048    #[test]
4049    // https://github.com/apache/arrow-rs/issues/6988
4050    fn test_roundtrip_empty_schema() {
4051        // create empty record batch with empty schema
4052        let empty_batch = RecordBatch::try_new_with_options(
4053            Arc::new(Schema::empty()),
4054            vec![],
4055            &RecordBatchOptions::default().with_row_count(Some(0)),
4056        )
4057        .unwrap();
4058
4059        // write to parquet
4060        let mut parquet_bytes: Vec<u8> = Vec::new();
4061        let mut writer =
4062            ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4063        writer.write(&empty_batch).unwrap();
4064        writer.close().unwrap();
4065
4066        // read from parquet
4067        let bytes = Bytes::from(parquet_bytes);
4068        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4069        assert_eq!(reader.schema(), &empty_batch.schema());
4070        let batches: Vec<_> = reader
4071            .build()
4072            .unwrap()
4073            .collect::<ArrowResult<Vec<_>>>()
4074            .unwrap();
4075        assert_eq!(batches.len(), 0);
4076    }
4077
4078    #[test]
4079    fn test_page_stats_not_written_by_default() {
4080        let string_field = Field::new("a", DataType::Utf8, false);
4081        let schema = Schema::new(vec![string_field]);
4082        let raw_string_values = vec!["Blart Versenwald III"];
4083        let string_values = StringArray::from(raw_string_values.clone());
4084        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4085
4086        let props = WriterProperties::builder()
4087            .set_statistics_enabled(EnabledStatistics::Page)
4088            .set_dictionary_enabled(false)
4089            .set_encoding(Encoding::PLAIN)
4090            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4091            .build();
4092
4093        let mut file = roundtrip_opts(&batch, props);
4094
4095        // read file and decode page headers
4096        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4097        let mut buf = vec![];
4098        file.seek(std::io::SeekFrom::Start(0)).unwrap();
4099        let read = file.read_to_end(&mut buf).unwrap();
4100        assert!(read > 0);
4101
4102        // decode first page header
4103        let first_page = &buf[4..];
4104        let mut prot = TCompactSliceInputProtocol::new(first_page);
4105        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4106        let stats = hdr.data_page_header.unwrap().statistics;
4107
4108        assert!(stats.is_none());
4109    }
4110
4111    #[test]
4112    fn test_page_stats_when_enabled() {
4113        let string_field = Field::new("a", DataType::Utf8, false);
4114        let schema = Schema::new(vec![string_field]);
4115        let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4116        let string_values = StringArray::from(raw_string_values.clone());
4117        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4118
4119        let props = WriterProperties::builder()
4120            .set_statistics_enabled(EnabledStatistics::Page)
4121            .set_dictionary_enabled(false)
4122            .set_encoding(Encoding::PLAIN)
4123            .set_write_page_header_statistics(true)
4124            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4125            .build();
4126
4127        let mut file = roundtrip_opts(&batch, props);
4128
4129        // read file and decode page headers
4130        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4131        let mut buf = vec![];
4132        file.seek(std::io::SeekFrom::Start(0)).unwrap();
4133        let read = file.read_to_end(&mut buf).unwrap();
4134        assert!(read > 0);
4135
4136        // decode first page header
4137        let first_page = &buf[4..];
4138        let mut prot = TCompactSliceInputProtocol::new(first_page);
4139        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4140        let stats = hdr.data_page_header.unwrap().statistics;
4141
4142        let stats = stats.unwrap();
4143        // check that min/max were actually written to the page
4144        assert!(stats.is_max_value_exact.unwrap());
4145        assert!(stats.is_min_value_exact.unwrap());
4146        assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4147        assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4148    }
4149
4150    #[test]
4151    fn test_page_stats_truncation() {
4152        let string_field = Field::new("a", DataType::Utf8, false);
4153        let binary_field = Field::new("b", DataType::Binary, false);
4154        let schema = Schema::new(vec![string_field, binary_field]);
4155
4156        let raw_string_values = vec!["Blart Versenwald III"];
4157        let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4158        let raw_binary_value_refs = raw_binary_values
4159            .iter()
4160            .map(|x| x.as_slice())
4161            .collect::<Vec<_>>();
4162
4163        let string_values = StringArray::from(raw_string_values.clone());
4164        let binary_values = BinaryArray::from(raw_binary_value_refs);
4165        let batch = RecordBatch::try_new(
4166            Arc::new(schema),
4167            vec![Arc::new(string_values), Arc::new(binary_values)],
4168        )
4169        .unwrap();
4170
4171        let props = WriterProperties::builder()
4172            .set_statistics_truncate_length(Some(2))
4173            .set_dictionary_enabled(false)
4174            .set_encoding(Encoding::PLAIN)
4175            .set_write_page_header_statistics(true)
4176            .set_compression(crate::basic::Compression::UNCOMPRESSED)
4177            .build();
4178
4179        let mut file = roundtrip_opts(&batch, props);
4180
4181        // read file and decode page headers
4182        // Note: use the thrift API as there is no Rust API to access the statistics in the page headers
4183        let mut buf = vec![];
4184        file.seek(std::io::SeekFrom::Start(0)).unwrap();
4185        let read = file.read_to_end(&mut buf).unwrap();
4186        assert!(read > 0);
4187
4188        // decode first page header
4189        let first_page = &buf[4..];
4190        let mut prot = TCompactSliceInputProtocol::new(first_page);
4191        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4192        let stats = hdr.data_page_header.unwrap().statistics;
4193        assert!(stats.is_some());
4194        let stats = stats.unwrap();
4195        // check that min/max were properly truncated
4196        assert!(!stats.is_max_value_exact.unwrap());
4197        assert!(!stats.is_min_value_exact.unwrap());
4198        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4199        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4200
4201        // check second page now
4202        let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4203        let mut prot = TCompactSliceInputProtocol::new(second_page);
4204        let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4205        let stats = hdr.data_page_header.unwrap().statistics;
4206        assert!(stats.is_some());
4207        let stats = stats.unwrap();
4208        // check that min/max were properly truncated
4209        assert!(!stats.is_max_value_exact.unwrap());
4210        assert!(!stats.is_min_value_exact.unwrap());
4211        assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4212        assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4213    }
4214
4215    #[test]
4216    fn test_page_encoding_statistics_roundtrip() {
4217        let batch_schema = Schema::new(vec![Field::new(
4218            "int32",
4219            arrow_schema::DataType::Int32,
4220            false,
4221        )]);
4222
4223        let batch = RecordBatch::try_new(
4224            Arc::new(batch_schema.clone()),
4225            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4226        )
4227        .unwrap();
4228
4229        let mut file: File = tempfile::tempfile().unwrap();
4230        let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4231        writer.write(&batch).unwrap();
4232        let file_metadata = writer.close().unwrap();
4233
4234        assert_eq!(file_metadata.row_groups.len(), 1);
4235        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
4236        let chunk_meta = file_metadata.row_groups[0].columns[0]
4237            .meta_data
4238            .as_ref()
4239            .expect("column metadata missing");
4240        assert!(chunk_meta.encoding_stats.is_some());
4241        let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
4242
4243        // check that the read metadata is also correct
4244        let options = ReadOptionsBuilder::new().with_page_index().build();
4245        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4246
4247        let rowgroup = reader.get_row_group(0).expect("row group missing");
4248        assert_eq!(rowgroup.num_columns(), 1);
4249        let column = rowgroup.metadata().column(0);
4250        assert!(column.page_encoding_stats().is_some());
4251        let file_page_stats = column.page_encoding_stats().unwrap();
4252        let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
4253            .iter()
4254            .map(|x| crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
4255            .collect();
4256        assert_eq!(&chunk_stats, file_page_stats);
4257    }
4258
4259    #[test]
4260    fn test_different_dict_page_size_limit() {
4261        let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4262        let schema = Arc::new(Schema::new(vec![
4263            Field::new("col0", arrow_schema::DataType::Int64, false),
4264            Field::new("col1", arrow_schema::DataType::Int64, false),
4265        ]));
4266        let batch =
4267            arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4268
4269        let props = WriterProperties::builder()
4270            .set_dictionary_page_size_limit(1024 * 1024)
4271            .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4272            .build();
4273        let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4274        writer.write(&batch).unwrap();
4275        let data = Bytes::from(writer.into_inner().unwrap());
4276
4277        let mut metadata = ParquetMetaDataReader::new();
4278        metadata.try_parse(&data).unwrap();
4279        let metadata = metadata.finish().unwrap();
4280        let col0_meta = metadata.row_group(0).column(0);
4281        let col1_meta = metadata.row_group(0).column(1);
4282
4283        let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4284            let mut reader =
4285                SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4286            let page = reader.get_next_page().unwrap().unwrap();
4287            match page {
4288                Page::DictionaryPage { buf, .. } => buf.len(),
4289                _ => panic!("expected DictionaryPage"),
4290            }
4291        };
4292
4293        assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4294        assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4295    }
4296}