Skip to main content

parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30    BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, IntType, LogicalType,
31    PageType, Type,
32};
33use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
34use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
35use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
36use crate::data_type::private::ParquetValueType;
37use crate::data_type::*;
38use crate::encodings::levels::LevelEncoder;
39#[cfg(feature = "encryption")]
40use crate::encryption::encrypt::get_column_crypto_metadata;
41use crate::errors::{ParquetError, Result};
42use crate::file::metadata::{
43    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
44    OffsetIndexBuilder, PageEncodingStats,
45};
46use crate::file::properties::{
47    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
48};
49use crate::file::statistics::{Statistics, ValueStatistics};
50use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
51
52mod byte_budget_chunker;
53pub(crate) mod encoder;
54
55use byte_budget_chunker::ByteBudgetChunker;
56
57macro_rules! downcast_writer {
58    ($e:expr, $i:ident, $b:expr) => {
59        match $e {
60            Self::BoolColumnWriter($i) => $b,
61            Self::Int32ColumnWriter($i) => $b,
62            Self::Int64ColumnWriter($i) => $b,
63            Self::Int96ColumnWriter($i) => $b,
64            Self::FloatColumnWriter($i) => $b,
65            Self::DoubleColumnWriter($i) => $b,
66            Self::ByteArrayColumnWriter($i) => $b,
67            Self::FixedLenByteArrayColumnWriter($i) => $b,
68        }
69    };
70}
71
72/// Column writer for a Parquet type.
73///
74/// See [`get_column_writer`] to create instances of this type
75pub enum ColumnWriter<'a> {
76    /// Column writer for boolean type
77    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
78    /// Column writer for int32 type
79    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
80    /// Column writer for int64 type
81    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
82    /// Column writer for int96 (timestamp) type
83    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
84    /// Column writer for float type
85    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
86    /// Column writer for double type
87    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
88    /// Column writer for byte array type
89    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
90    /// Column writer for fixed length byte array type
91    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
92}
93
94impl ColumnWriter<'_> {
95    /// Returns the estimated total memory usage
96    #[cfg(feature = "arrow")]
97    pub(crate) fn memory_size(&self) -> usize {
98        downcast_writer!(self, typed, typed.memory_size())
99    }
100
101    /// Returns the estimated total encoded bytes for this column writer
102    #[cfg(feature = "arrow")]
103    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
104        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
105    }
106
107    /// Finalize the currently buffered values as a data page.
108    ///
109    /// This is used by content-defined chunking to force a page boundary at
110    /// content-determined positions.
111    #[cfg(feature = "arrow")]
112    pub(crate) fn add_data_page(&mut self) -> Result<()> {
113        downcast_writer!(self, typed, typed.add_data_page())
114    }
115
116    /// Close this [`ColumnWriter`], returning the metadata for the column chunk.
117    pub fn close(self) -> Result<ColumnCloseResult> {
118        downcast_writer!(self, typed, typed.close())
119    }
120}
121
122/// Create a specific column writer corresponding to column descriptor `descr`.
123pub fn get_column_writer<'a>(
124    descr: ColumnDescPtr,
125    props: WriterPropertiesPtr,
126    page_writer: Box<dyn PageWriter + 'a>,
127) -> ColumnWriter<'a> {
128    match descr.physical_type() {
129        Type::BOOLEAN => {
130            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
131        }
132        Type::INT32 => {
133            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
134        }
135        Type::INT64 => {
136            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
137        }
138        Type::INT96 => {
139            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
140        }
141        Type::FLOAT => {
142            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
143        }
144        Type::DOUBLE => {
145            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
146        }
147        Type::BYTE_ARRAY => {
148            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
149        }
150        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
151            ColumnWriterImpl::new(descr, props, page_writer),
152        ),
153    }
154}
155
156/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
157/// non-generic type to a generic column writer type `ColumnWriterImpl`.
158///
159/// Panics if actual enum value for `col_writer` does not match the type `T`.
160pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
161    T::get_column_writer(col_writer).unwrap_or_else(|| {
162        panic!(
163            "Failed to convert column writer into a typed column writer for `{}` type",
164            T::get_physical_type()
165        )
166    })
167}
168
169/// Similar to `get_typed_column_writer` but returns a reference.
170pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
171    col_writer: &'b ColumnWriter<'a>,
172) -> &'b ColumnWriterImpl<'a, T> {
173    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
174        panic!(
175            "Failed to convert column writer into a typed column writer for `{}` type",
176            T::get_physical_type()
177        )
178    })
179}
180
181/// Similar to `get_typed_column_writer` but returns a reference.
182pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
183    col_writer: &'a mut ColumnWriter<'b>,
184) -> &'a mut ColumnWriterImpl<'b, T> {
185    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
186        panic!(
187            "Failed to convert column writer into a typed column writer for `{}` type",
188            T::get_physical_type()
189        )
190    })
191}
192
193/// Metadata for a column chunk of a Parquet file.
194///
195/// Note this structure is returned by [`ColumnWriter::close`].
196#[derive(Debug, Clone)]
197pub struct ColumnCloseResult {
198    /// The total number of bytes written
199    pub bytes_written: u64,
200    /// The total number of rows written
201    pub rows_written: u64,
202    /// Metadata for this column chunk
203    pub metadata: ColumnChunkMetaData,
204    /// Optional bloom filter for this column
205    pub bloom_filter: Option<Sbbf>,
206    /// Optional column index, for filtering
207    pub column_index: Option<ColumnIndexMetaData>,
208    /// Optional offset index, identifying page locations
209    pub offset_index: Option<OffsetIndexMetaData>,
210}
211
212impl ColumnCloseResult {
213    /// Rewrite the page offsets for a dictionary-first on-disk layout.
214    ///
215    /// A writer that buffers the whole column chunk and splices it later (the
216    /// Arrow path) may accept the data pages *before* the dictionary page so the
217    /// data pages can stream straight through, then emit the dictionary page
218    /// first at splice. The offsets recorded during encoding therefore assume a
219    /// data-pages-first layout; call this with the serialized length of the
220    /// dictionary page to move it to offset 0 and shift every data page after
221    /// it. A `dictionary_len` of 0 (no dictionary page) leaves the result
222    /// unchanged.
223    pub fn update_dictionary_location(mut self, dictionary_len: usize) -> Result<Self> {
224        if dictionary_len > 0 {
225            self.metadata = self
226                .metadata
227                .into_builder()
228                .set_dictionary_page_offset(Some(0))
229                .set_data_page_offset(dictionary_len as i64)
230                .build()?;
231            if let Some(offset_index) = self.offset_index.as_mut() {
232                let mut offset = dictionary_len as i64;
233                for location in offset_index.page_locations.iter_mut() {
234                    location.offset = offset;
235                    offset += location.compressed_page_size as i64;
236                }
237            }
238        }
239        Ok(self)
240    }
241}
242
243// Metrics per page
244#[derive(Default)]
245struct PageMetrics {
246    num_buffered_values: u32,
247    num_buffered_rows: u32,
248    num_page_nulls: u64,
249    repetition_level_histogram: Option<LevelHistogram>,
250    definition_level_histogram: Option<LevelHistogram>,
251}
252
253impl PageMetrics {
254    fn new() -> Self {
255        Default::default()
256    }
257
258    /// Initialize the repetition level histogram
259    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
260        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
261        self
262    }
263
264    /// Initialize the definition level histogram
265    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
266        self.definition_level_histogram = LevelHistogram::try_new(max_level);
267        self
268    }
269
270    /// Resets the state of this `PageMetrics` to the initial state.
271    /// If histograms have been initialized their contents will be reset to zero.
272    fn new_page(&mut self) {
273        self.num_buffered_values = 0;
274        self.num_buffered_rows = 0;
275        self.num_page_nulls = 0;
276        self.repetition_level_histogram
277            .as_mut()
278            .map(LevelHistogram::reset);
279        self.definition_level_histogram
280            .as_mut()
281            .map(LevelHistogram::reset);
282    }
283}
284
285// Metrics per column writer
286#[derive(Default)]
287struct ColumnMetrics<T: Default> {
288    total_bytes_written: u64,
289    total_rows_written: u64,
290    total_uncompressed_size: u64,
291    total_compressed_size: u64,
292    total_num_values: u64,
293    dictionary_page_offset: Option<u64>,
294    data_page_offset: Option<u64>,
295    min_column_value: Option<T>,
296    max_column_value: Option<T>,
297    num_column_nulls: u64,
298    column_distinct_count: Option<u64>,
299    variable_length_bytes: Option<i64>,
300    repetition_level_histogram: Option<LevelHistogram>,
301    definition_level_histogram: Option<LevelHistogram>,
302}
303
304impl<T: Default> ColumnMetrics<T> {
305    fn new() -> Self {
306        Default::default()
307    }
308
309    /// Initialize the repetition level histogram
310    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
311        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
312        self
313    }
314
315    /// Initialize the definition level histogram
316    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
317        self.definition_level_histogram = LevelHistogram::try_new(max_level);
318        self
319    }
320
321    /// Sum `page_histogram` into `chunk_histogram`
322    fn update_histogram(
323        chunk_histogram: &mut Option<LevelHistogram>,
324        page_histogram: &Option<LevelHistogram>,
325    ) {
326        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
327            chunk_hist.add(page_hist);
328        }
329    }
330
331    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
332    /// page histograms are not initialized.
333    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
334        ColumnMetrics::<T>::update_histogram(
335            &mut self.definition_level_histogram,
336            &page_metrics.definition_level_histogram,
337        );
338        ColumnMetrics::<T>::update_histogram(
339            &mut self.repetition_level_histogram,
340            &page_metrics.repetition_level_histogram,
341        );
342    }
343
344    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
345    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
346        if let Some(var_bytes) = variable_length_bytes {
347            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
348        }
349    }
350}
351
352/// Borrowed view of level data, analogous to `&str` for `LevelData`'s `String`.
353///
354/// `LevelDataRef` can be constructed from `LevelData` and directly from an existing
355/// `&[i16]` without allocating.
356///
357/// The variants are different physical representations of the same logical
358/// sequence of levels.
359#[derive(Debug, Clone, Copy)]
360pub(crate) enum LevelDataRef<'a> {
361    Absent,
362    Materialized(&'a [i16]),
363    Uniform { value: i16, count: usize },
364}
365
366impl<'a> From<&'a [i16]> for LevelDataRef<'a> {
367    fn from(levels: &'a [i16]) -> Self {
368        Self::Materialized(levels)
369    }
370}
371
372impl<'a> From<Option<&'a [i16]>> for LevelDataRef<'a> {
373    fn from(levels: Option<&'a [i16]>) -> Self {
374        levels.map_or(Self::Absent, Self::from)
375    }
376}
377
378impl<'a> LevelDataRef<'a> {
379    pub(crate) fn len(self) -> usize {
380        match self {
381            Self::Absent => 0,
382            Self::Materialized(values) => values.len(),
383            Self::Uniform { count, .. } => count,
384        }
385    }
386
387    pub(crate) fn first(self) -> Option<i16> {
388        match self {
389            Self::Absent => None,
390            Self::Materialized(values) => values.first().copied(),
391            Self::Uniform { value, count } => (count > 0).then_some(value),
392        }
393    }
394
395    #[cfg(feature = "arrow")]
396    pub(crate) fn value_at(self, idx: usize) -> Option<i16> {
397        match self {
398            Self::Absent => None,
399            Self::Materialized(values) => values.get(idx).copied(),
400            Self::Uniform { value, count } => (idx < count).then_some(value),
401        }
402    }
403
404    pub(crate) fn slice(self, offset: usize, len: usize) -> Self {
405        match self {
406            Self::Absent => Self::Absent,
407            Self::Materialized(values) => Self::Materialized(&values[offset..offset + len]),
408            Self::Uniform { value, .. } => Self::Uniform { value, count: len },
409        }
410    }
411
412    /// Count of positions in this slice that represent an actual value
413    /// (definition level equal to `max_def`). `Absent` means the column has
414    /// `max_def == 0` and every position is a value, so the implicit count
415    /// is the caller-supplied `total`.
416    pub(crate) fn value_count(self, total: usize, max_def: i16) -> usize {
417        match self {
418            Self::Absent => total,
419            Self::Materialized(values) => values.iter().filter(|&&d| d == max_def).count(),
420            Self::Uniform { value, count } => {
421                if value == max_def {
422                    count
423                } else {
424                    0
425                }
426            }
427        }
428    }
429}
430
431/// Typed column writer for a primitive column.
432pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
433
434/// Generic column writer for a primitive Parquet column
435pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
436    // Column writer properties
437    descr: ColumnDescPtr,
438    props: WriterPropertiesPtr,
439    statistics_enabled: EnabledStatistics,
440
441    page_writer: Box<dyn PageWriter + 'a>,
442    codec: Compression,
443    compressor: Option<Box<dyn Codec>>,
444    encoder: E,
445
446    page_metrics: PageMetrics,
447    // Metrics per column writer
448    column_metrics: ColumnMetrics<E::T>,
449
450    /// The order of encodings within the generated metadata does not impact its meaning,
451    /// but we use a BTreeSet so that the output is deterministic
452    encodings: BTreeSet<Encoding>,
453    encoding_stats: Vec<PageEncodingStats>,
454    // Streaming level encoders for definition/repetition levels.
455    def_levels_encoder: LevelEncoder,
456    rep_levels_encoder: LevelEncoder,
457    data_pages: VecDeque<CompressedPage>,
458    // column index and offset index
459    column_index_builder: ColumnIndexBuilder,
460    offset_index_builder: Option<OffsetIndexBuilder>,
461
462    // Below fields used to incrementally check boundary order across data pages.
463    // We assume they are ascending/descending until proven wrong.
464    data_page_boundary_ascending: bool,
465    data_page_boundary_descending: bool,
466    /// (min, max)
467    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
468}
469
470impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
471    /// Returns a new instance of [`GenericColumnWriter`].
472    pub fn new(
473        descr: ColumnDescPtr,
474        props: WriterPropertiesPtr,
475        page_writer: Box<dyn PageWriter + 'a>,
476    ) -> Self {
477        let codec = props.compression(descr.path());
478        let codec_options = CodecOptionsBuilder::default().build();
479        let compressor = create_codec(codec, &codec_options).unwrap();
480        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
481
482        let statistics_enabled = props.statistics_enabled(descr.path());
483
484        let mut encodings = BTreeSet::new();
485        // Used for level information
486        encodings.insert(Encoding::RLE);
487
488        let mut page_metrics = PageMetrics::new();
489        let mut column_metrics = ColumnMetrics::<E::T>::new();
490
491        // Initialize level histograms if collecting page or chunk statistics
492        if statistics_enabled != EnabledStatistics::None {
493            page_metrics = page_metrics
494                .with_repetition_level_histogram(descr.max_rep_level())
495                .with_definition_level_histogram(descr.max_def_level());
496            column_metrics = column_metrics
497                .with_repetition_level_histogram(descr.max_rep_level())
498                .with_definition_level_histogram(descr.max_def_level())
499        }
500
501        // Disable column_index_builder if not collecting page statistics.
502        let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
503        if statistics_enabled != EnabledStatistics::Page {
504            column_index_builder.to_invalid()
505        }
506
507        // Disable offset_index_builder if requested by user.
508        let offset_index_builder = match props.offset_index_disabled() {
509            false => Some(OffsetIndexBuilder::new()),
510            _ => None,
511        };
512
513        Self {
514            def_levels_encoder: Self::create_level_encoder(descr.max_def_level(), &props),
515            rep_levels_encoder: Self::create_level_encoder(descr.max_rep_level(), &props),
516            descr,
517            props,
518            statistics_enabled,
519            page_writer,
520            codec,
521            compressor,
522            encoder,
523            data_pages: VecDeque::new(),
524            page_metrics,
525            column_metrics,
526            column_index_builder,
527            offset_index_builder,
528            encodings,
529            encoding_stats: vec![],
530            data_page_boundary_ascending: true,
531            data_page_boundary_descending: true,
532            last_non_null_data_page_min_max: None,
533        }
534    }
535
536    #[allow(clippy::too_many_arguments)]
537    pub(crate) fn write_batch_internal(
538        &mut self,
539        values: &E::Values,
540        value_indices: Option<&[usize]>,
541        def_levels: LevelDataRef<'_>,
542        rep_levels: LevelDataRef<'_>,
543        min: Option<&E::T>,
544        max: Option<&E::T>,
545        distinct_count: Option<u64>,
546    ) -> Result<usize> {
547        // Check if number of definition levels is the same as number of repetition levels.
548        if def_levels.len() != 0 && rep_levels.len() != 0 && def_levels.len() != rep_levels.len() {
549            return Err(general_err!(
550                "Inconsistent length of definition and repetition levels: {} != {}",
551                def_levels.len(),
552                rep_levels.len()
553            ));
554        }
555
556        // We check for DataPage limits only after we have inserted the values. If a user
557        // writes a large number of values, the DataPage size can be well above the limit.
558        //
559        // The purpose of this chunking is to bound this. Even if a user writes large
560        // number of values, the chunking will ensure that we add data page at a
561        // reasonable pagesize limit.
562
563        // TODO: find out why we don't account for size of levels when we estimate page
564        // size.
565        let num_levels = def_levels.len().max(rep_levels.len());
566        let num_levels = if num_levels > 0 {
567            num_levels
568        } else {
569            value_indices.map_or(values.len(), |i| i.len())
570        };
571
572        if let Some(min) = min {
573            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
574        }
575        if let Some(max) = max {
576            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
577        }
578
579        // We can only set the distinct count if there are no other writes
580        if self.encoder.num_values() == 0 {
581            self.column_metrics.column_distinct_count = distinct_count;
582        } else {
583            self.column_metrics.column_distinct_count = None;
584        }
585
586        let mut values_offset = 0;
587        let mut levels_offset = 0;
588        let both_levels_compact = !matches!(def_levels, LevelDataRef::Materialized(_))
589            && !matches!(rep_levels, LevelDataRef::Materialized(_));
590        let has_levels = !matches!(def_levels, LevelDataRef::Absent)
591            || !matches!(rep_levels, LevelDataRef::Absent);
592        // When both level vectors are compact (Uniform or Absent), there is no
593        // materialized slice to split and the per-mini-batch work is O(1), so we
594        // can safely use a much larger batch size.
595        let base_batch_size = if both_levels_compact && has_levels {
596            self.props.data_page_row_count_limit()
597        } else {
598            self.props.write_batch_size()
599        };
600        let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size);
601        while levels_offset < num_levels {
602            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
603
604            // Split at record boundary
605            if let LevelDataRef::Materialized(levels) = rep_levels {
606                while end_offset < levels.len() && levels[end_offset] != 0 {
607                    end_offset += 1;
608                }
609            }
610
611            let chunk_size = end_offset - levels_offset;
612            let chunk_def = def_levels.slice(levels_offset, chunk_size);
613            let chunk_rep = rep_levels.slice(levels_offset, chunk_size);
614
615            // Key decision point: can we write this whole chunk as one
616            // mini-batch (the common case — small or fixed-width values, no
617            // further page-size accounting needed), or must we fall back to
618            // byte-budget-aware sub-batching to keep a page from overshooting
619            // `data_page_size_limit`? `pick_sub_batch_size` returns
620            // `chunk_size` for the former.
621            let sub_batch_size = chunker.pick_sub_batch_size(
622                &self.encoder,
623                values,
624                value_indices,
625                chunk_def,
626                values_offset,
627                chunk_size,
628            );
629
630            if sub_batch_size >= chunk_size {
631                values_offset += self.write_mini_batch(
632                    values,
633                    values_offset,
634                    value_indices,
635                    chunk_size,
636                    chunk_def,
637                    chunk_rep,
638                )?;
639            } else {
640                values_offset += self.write_granular_chunk(
641                    values,
642                    values_offset,
643                    value_indices,
644                    chunk_size,
645                    chunk_def,
646                    chunk_rep,
647                    sub_batch_size,
648                )?;
649            }
650            levels_offset = end_offset;
651        }
652
653        // Return total number of values processed.
654        Ok(values_offset)
655    }
656
657    /// Writes batch of values, definition levels and repetition levels.
658    /// Returns number of values processed (written).
659    ///
660    /// If definition and repetition levels are provided, we write fully those levels and
661    /// select how many values to write (this number will be returned), since number of
662    /// actual written values may be smaller than provided values.
663    ///
664    /// If only values are provided, then all values are written and the length of
665    /// of the values buffer is returned.
666    ///
667    /// Definition and/or repetition levels can be omitted, if values are
668    /// non-nullable and/or non-repeated.
669    pub fn write_batch(
670        &mut self,
671        values: &E::Values,
672        def_levels: Option<&[i16]>,
673        rep_levels: Option<&[i16]>,
674    ) -> Result<usize> {
675        self.write_batch_internal(
676            values,
677            None,
678            LevelDataRef::from(def_levels),
679            LevelDataRef::from(rep_levels),
680            None,
681            None,
682            None,
683        )
684    }
685
686    /// Writer may optionally provide pre-calculated statistics for use when computing
687    /// chunk-level statistics
688    ///
689    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
690    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
691    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
692    /// computed page statistics
693    pub fn write_batch_with_statistics(
694        &mut self,
695        values: &E::Values,
696        def_levels: Option<&[i16]>,
697        rep_levels: Option<&[i16]>,
698        min: Option<&E::T>,
699        max: Option<&E::T>,
700        distinct_count: Option<u64>,
701    ) -> Result<usize> {
702        self.write_batch_internal(
703            values,
704            None,
705            LevelDataRef::from(def_levels),
706            LevelDataRef::from(rep_levels),
707            min,
708            max,
709            distinct_count,
710        )
711    }
712
713    /// Returns the estimated total memory usage.
714    ///
715    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
716    /// of the current memory usage and not the final anticipated encoded size.
717    #[cfg(feature = "arrow")]
718    pub(crate) fn memory_size(&self) -> usize {
719        // In-flight encoder buffers, plus any completed pages still held on the
720        // heap: the dictionary-column data pages buffered here (column-at-a-time
721        // path), plus whatever the page writer keeps resident. A page writer
722        // that spills completed pages off-heap reports far less than the bytes
723        // it was handed, so this tracks real memory rather than bytes written.
724        self.encoder.estimated_memory_size()
725            + self
726                .data_pages
727                .iter()
728                .map(|page| page.memory_usage())
729                .sum::<usize>()
730            + self.page_writer.buffered_memory_size()
731    }
732
733    /// Returns total number of bytes written by this column writer so far.
734    /// This value is also returned when column writer is closed.
735    ///
736    /// Note: this value does not include any buffered data that has not
737    /// yet been flushed to a page.
738    pub fn get_total_bytes_written(&self) -> u64 {
739        self.column_metrics.total_bytes_written
740    }
741
742    /// Returns the estimated total encoded bytes for this column writer.
743    ///
744    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
745    /// of any data that has not yet been flushed to a page, based on it's
746    /// anticipated encoded size.
747    #[cfg(feature = "arrow")]
748    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
749        self.data_pages
750            .iter()
751            .map(|page| page.data().len() as u64)
752            .sum::<u64>()
753            + self.column_metrics.total_bytes_written
754            + self.encoder.estimated_data_page_size() as u64
755            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
756    }
757
758    /// Returns total number of rows written by this column writer so far.
759    /// This value is also returned when column writer is closed.
760    pub fn get_total_rows_written(&self) -> u64 {
761        self.column_metrics.total_rows_written
762    }
763
764    /// Returns a reference to a [`ColumnDescPtr`]
765    pub fn get_descriptor(&self) -> &ColumnDescPtr {
766        &self.descr
767    }
768
769    /// Finalizes writes and closes the column writer.
770    /// Returns total bytes written, total rows written and column chunk metadata.
771    pub fn close(mut self) -> Result<ColumnCloseResult> {
772        if self.page_metrics.num_buffered_values > 0 {
773            self.add_data_page()?;
774        }
775        if self.encoder.has_dictionary() {
776            self.write_dictionary_page()?;
777        }
778        self.flush_data_pages()?;
779        let metadata = self.build_column_metadata()?;
780        self.page_writer.close()?;
781
782        let boundary_order = match (
783            self.data_page_boundary_ascending,
784            self.data_page_boundary_descending,
785        ) {
786            // If the lists are composed of equal elements then will be marked as ascending
787            // (Also the case if all pages are null pages)
788            (true, _) => BoundaryOrder::ASCENDING,
789            (false, true) => BoundaryOrder::DESCENDING,
790            (false, false) => BoundaryOrder::UNORDERED,
791        };
792        self.column_index_builder.set_boundary_order(boundary_order);
793
794        let column_index = match self.column_index_builder.valid() {
795            true => Some(self.column_index_builder.build()?),
796            false => None,
797        };
798
799        let offset_index = self.offset_index_builder.map(|b| b.build());
800
801        Ok(ColumnCloseResult {
802            bytes_written: self.column_metrics.total_bytes_written,
803            rows_written: self.column_metrics.total_rows_written,
804            bloom_filter: self.encoder.flush_bloom_filter(),
805            metadata,
806            column_index,
807            offset_index,
808        })
809    }
810
811    /// Writes a chunk in `sub_batch_size`-level sub-batches, checking the
812    /// data page byte limit after each. This keeps the page size close to
813    /// `data_page_size_limit` instead of overshooting it by a whole chunk.
814    ///
815    /// For repeated/nested columns sub-batches step from one `rep == 0`
816    /// boundary to the next so a record never spans data pages, matching
817    /// the parquet format rule.
818    ///
819    /// Returns the total number of values consumed across all sub-batches.
820    ///
821    /// `#[inline(never)]` keeps this slow path — only reached for
822    /// variable-width columns whose values need page splitting — out of
823    /// the hot `write_batch_internal` loop.
824    #[allow(clippy::too_many_arguments)]
825    #[inline(never)]
826    fn write_granular_chunk(
827        &mut self,
828        values: &E::Values,
829        values_offset: usize,
830        value_indices: Option<&[usize]>,
831        chunk_size: usize,
832        chunk_def: LevelDataRef<'_>,
833        chunk_rep: LevelDataRef<'_>,
834        sub_batch_size: usize,
835    ) -> Result<usize> {
836        // The chunker always sizes a sub-batch to at least one level, so each
837        // iteration below makes progress (`sub_end > sub_start`).
838        debug_assert!(sub_batch_size >= 1, "chunker must size at least one level");
839        let mut values_consumed = 0;
840        let mut sub_start = 0;
841        while sub_start < chunk_size {
842            let sub_end = match chunk_rep {
843                LevelDataRef::Materialized(levels) => {
844                    // Pack up to `sub_batch_size` levels per mini-batch, then
845                    // extend to the next record boundary (rep == 0) so a
846                    // record never spans data pages. Packing whole records
847                    // rather than stepping one record at a time avoids
848                    // calling `write_mini_batch` per record: records average
849                    // only a handful of levels, so a record-at-a-time step
850                    // would issue many more mini-batches than necessary.
851                    let mut e = (sub_start + sub_batch_size).min(chunk_size);
852                    while e < chunk_size && levels[e] != 0 {
853                        e += 1;
854                    }
855                    e
856                }
857                _ => (sub_start + sub_batch_size).min(chunk_size),
858            };
859            let sub_len = sub_end - sub_start;
860            let written = self.write_mini_batch(
861                values,
862                values_offset + values_consumed,
863                value_indices,
864                sub_len,
865                chunk_def.slice(sub_start, sub_len),
866                chunk_rep.slice(sub_start, sub_len),
867            )?;
868            values_consumed += written;
869            sub_start = sub_end;
870        }
871        Ok(values_consumed)
872    }
873
874    /// Creates a new streaming level encoder appropriate for the writer version.
875    fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
876        match props.writer_version() {
877            WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
878            WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
879        }
880    }
881
882    /// Writes mini batch of values, definition and repetition levels.
883    /// This allows fine-grained processing of values and maintaining a reasonable
884    /// page size.
885    fn write_mini_batch(
886        &mut self,
887        values: &E::Values,
888        values_offset: usize,
889        value_indices: Option<&[usize]>,
890        num_levels: usize,
891        def_levels: LevelDataRef<'_>,
892        rep_levels: LevelDataRef<'_>,
893    ) -> Result<usize> {
894        // Process definition levels and determine how many values to write.
895        let values_to_write = if self.descr.max_def_level() > 0 {
896            let max_def = self.descr.max_def_level();
897            match def_levels {
898                LevelDataRef::Absent => {
899                    return Err(general_err!(
900                        "Definition levels are required, because max definition level = {}",
901                        self.descr.max_def_level()
902                    ));
903                }
904                LevelDataRef::Materialized(levels) => {
905                    // General path for caller-provided or already-materialized
906                    // level buffers.
907                    let mut values_to_write = 0usize;
908                    let encoder = &mut self.def_levels_encoder;
909                    match self.page_metrics.definition_level_histogram.as_mut() {
910                        Some(histogram) => encoder.put_with_observer(levels, |level, count| {
911                            values_to_write += count * (level == max_def) as usize;
912                            histogram.increment_by(level, count as i64);
913                        }),
914                        None => encoder.put_with_observer(levels, |level, count| {
915                            values_to_write += count * (level == max_def) as usize;
916                        }),
917                    };
918                    self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
919                    values_to_write
920                }
921                LevelDataRef::Uniform { value, count } => {
922                    // Fast path for all-null, all-valid, or otherwise uniform
923                    // definition levels without materializing a level buffer.
924                    let encoder = &mut self.def_levels_encoder;
925                    match self.page_metrics.definition_level_histogram.as_mut() {
926                        Some(histogram) => {
927                            encoder.put_n_with_observer(value, count, |level, run_len| {
928                                histogram.increment_by(level, run_len as i64);
929                            })
930                        }
931                        None => encoder.put_n_with_observer(value, count, |_, _| {}),
932                    };
933                    let values_to_write = count * (value == max_def) as usize;
934                    self.page_metrics.num_page_nulls += (count - values_to_write) as u64;
935                    values_to_write
936                }
937            }
938        } else {
939            num_levels
940        };
941
942        // Process repetition levels and determine how many rows we are about to process.
943        if self.descr.max_rep_level() > 0 {
944            // A row could contain more than one value.
945            let first_level = rep_levels.first().ok_or_else(|| {
946                general_err!(
947                    "Repetition levels are required, because max repetition level = {}",
948                    self.descr.max_rep_level()
949                )
950            })?;
951
952            if first_level != 0 {
953                return Err(general_err!(
954                    "Write must start at a record boundary, got non-zero repetition level of {}",
955                    first_level
956                ));
957            }
958
959            let mut new_rows = 0u32;
960            match rep_levels {
961                LevelDataRef::Absent => unreachable!(),
962                LevelDataRef::Materialized(levels) => {
963                    let encoder = &mut self.rep_levels_encoder;
964                    match self.page_metrics.repetition_level_histogram.as_mut() {
965                        Some(histogram) => encoder.put_with_observer(levels, |level, count| {
966                            new_rows += (count as u32) * (level == 0) as u32;
967                            histogram.increment_by(level, count as i64);
968                        }),
969                        None => encoder.put_with_observer(levels, |level, count| {
970                            new_rows += (count as u32) * (level == 0) as u32;
971                        }),
972                    };
973                }
974                LevelDataRef::Uniform { value, count } => {
975                    let encoder = &mut self.rep_levels_encoder;
976                    match self.page_metrics.repetition_level_histogram.as_mut() {
977                        Some(histogram) => {
978                            encoder.put_n_with_observer(value, count, |level, run_len| {
979                                new_rows += (run_len as u32) * (level == 0) as u32;
980                                histogram.increment_by(level, run_len as i64);
981                            })
982                        }
983                        None => encoder.put_n_with_observer(value, count, |level, run_len| {
984                            new_rows += (run_len as u32) * (level == 0) as u32;
985                        }),
986                    };
987                }
988            }
989            self.page_metrics.num_buffered_rows += new_rows;
990        } else {
991            // Each value is exactly one row.
992            // Equals to the number of values, we count nulls as well.
993            self.page_metrics.num_buffered_rows += num_levels as u32;
994        }
995
996        match value_indices {
997            Some(indices) => {
998                let indices = &indices[values_offset..values_offset + values_to_write];
999                self.encoder.write_gather(values, indices)?;
1000            }
1001            None => self.encoder.write(values, values_offset, values_to_write)?,
1002        }
1003
1004        self.page_metrics.num_buffered_values += num_levels as u32;
1005
1006        if self.should_add_data_page() {
1007            self.add_data_page()?;
1008        }
1009
1010        if self.should_dict_fallback() {
1011            self.dict_fallback()?;
1012        }
1013
1014        Ok(values_to_write)
1015    }
1016
1017    /// Returns true if we need to fall back to non-dictionary encoding.
1018    ///
1019    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
1020    /// size.
1021    #[inline]
1022    fn should_dict_fallback(&self) -> bool {
1023        match self.encoder.estimated_dict_page_size() {
1024            Some(size) => {
1025                size >= self
1026                    .props
1027                    .column_dictionary_page_size_limit(self.descr.path())
1028            }
1029            None => false,
1030        }
1031    }
1032
1033    /// Returns true if there is enough data for a data page, false otherwise.
1034    #[inline]
1035    fn should_add_data_page(&self) -> bool {
1036        // This is necessary in the event of a much larger dictionary size than page size
1037        //
1038        // In such a scenario the dictionary decoder may return an estimated encoded
1039        // size in excess of the page size limit, even when there are no buffered values
1040        if self.page_metrics.num_buffered_values == 0 {
1041            return false;
1042        }
1043
1044        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
1045            || self.encoder.estimated_data_page_size()
1046                >= self.props.column_data_page_size_limit(self.descr.path())
1047    }
1048
1049    /// Performs dictionary fallback.
1050    /// Prepares and writes dictionary and all data pages into page writer.
1051    fn dict_fallback(&mut self) -> Result<()> {
1052        // At this point we know that we need to fall back.
1053        if self.page_metrics.num_buffered_values > 0 {
1054            self.add_data_page()?;
1055        }
1056        self.write_dictionary_page()?;
1057        self.flush_data_pages()?;
1058        Ok(())
1059    }
1060
1061    /// Update the column index and offset index when adding the data page
1062    fn update_column_offset_index(
1063        &mut self,
1064        page_statistics: Option<&ValueStatistics<E::T>>,
1065        page_variable_length_bytes: Option<i64>,
1066    ) {
1067        // update the column index
1068        let null_page =
1069            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
1070        // a page contains only null values,
1071        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
1072        if null_page && self.column_index_builder.valid() {
1073            self.column_index_builder.append(
1074                null_page,
1075                vec![],
1076                vec![],
1077                self.page_metrics.num_page_nulls as i64,
1078            );
1079        } else if self.column_index_builder.valid() {
1080            // from page statistics
1081            // If can't get the page statistics, ignore this column/offset index for this column chunk
1082            match &page_statistics {
1083                None => {
1084                    self.column_index_builder.to_invalid();
1085                }
1086                Some(stat) => {
1087                    // Check if min/max are still ascending/descending across pages
1088                    let new_min = stat.min_opt().unwrap();
1089                    let new_max = stat.max_opt().unwrap();
1090                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
1091                        if self.data_page_boundary_ascending {
1092                            // If last min/max are greater than new min/max then not ascending anymore
1093                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
1094                                || compare_greater(&self.descr, last_max, new_max);
1095                            if not_ascending {
1096                                self.data_page_boundary_ascending = false;
1097                            }
1098                        }
1099
1100                        if self.data_page_boundary_descending {
1101                            // If new min/max are greater than last min/max then not descending anymore
1102                            let not_descending = compare_greater(&self.descr, new_min, last_min)
1103                                || compare_greater(&self.descr, new_max, last_max);
1104                            if not_descending {
1105                                self.data_page_boundary_descending = false;
1106                            }
1107                        }
1108                    }
1109                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
1110
1111                    if self.can_truncate_value() {
1112                        self.column_index_builder.append(
1113                            null_page,
1114                            self.truncate_min_value(
1115                                self.props.column_index_truncate_length(),
1116                                stat.min_bytes_opt().unwrap(),
1117                            )
1118                            .0,
1119                            self.truncate_max_value(
1120                                self.props.column_index_truncate_length(),
1121                                stat.max_bytes_opt().unwrap(),
1122                            )
1123                            .0,
1124                            self.page_metrics.num_page_nulls as i64,
1125                        );
1126                    } else {
1127                        self.column_index_builder.append(
1128                            null_page,
1129                            stat.min_bytes_opt().unwrap().to_vec(),
1130                            stat.max_bytes_opt().unwrap().to_vec(),
1131                            self.page_metrics.num_page_nulls as i64,
1132                        );
1133                    }
1134                }
1135            }
1136        }
1137
1138        // Append page histograms to the `ColumnIndex` histograms
1139        self.column_index_builder.append_histograms(
1140            &self.page_metrics.repetition_level_histogram,
1141            &self.page_metrics.definition_level_histogram,
1142        );
1143
1144        // Update the offset index
1145        if let Some(builder) = self.offset_index_builder.as_mut() {
1146            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
1147            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
1148        }
1149    }
1150
1151    /// Determine if we should allow truncating min/max values for this column's statistics
1152    fn can_truncate_value(&self) -> bool {
1153        match self.descr.physical_type() {
1154            // Don't truncate for Float16 and Decimal because their sort order is different
1155            // from that of FIXED_LEN_BYTE_ARRAY sort order.
1156            // So truncation of those types could lead to inaccurate min/max statistics
1157            Type::FIXED_LEN_BYTE_ARRAY
1158                if !matches!(
1159                    self.descr.logical_type_ref(),
1160                    Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
1161                ) =>
1162            {
1163                true
1164            }
1165            Type::BYTE_ARRAY => true,
1166            // Truncation only applies for fba/binary physical types
1167            _ => false,
1168        }
1169    }
1170
1171    /// Returns `true` if this column's logical type is a UTF-8 string.
1172    fn is_utf8(&self) -> bool {
1173        self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
1174            || self.get_descriptor().converted_type() == ConvertedType::UTF8
1175    }
1176
1177    /// Truncates a binary statistic to at most `truncation_length` bytes.
1178    ///
1179    /// If truncation is not possible, returns `data`.
1180    ///
1181    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
1182    ///
1183    /// UTF-8 Note:
1184    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
1185    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
1186    /// on non-character boundaries.
1187    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1188        truncation_length
1189            .filter(|l| data.len() > *l)
1190            .and_then(|l|
1191                // don't do extra work if this column isn't UTF-8
1192                if self.is_utf8() {
1193                    match str::from_utf8(data) {
1194                        Ok(str_data) => truncate_utf8(str_data, l),
1195                        Err(_) => Some(data[..l].to_vec()),
1196                    }
1197                } else {
1198                    Some(data[..l].to_vec())
1199                }
1200            )
1201            .map(|truncated| (truncated, true))
1202            .unwrap_or_else(|| (data.to_vec(), false))
1203    }
1204
1205    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
1206    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
1207    /// `truncation_length` bytes if the last byte(s) overflows.
1208    ///
1209    /// If truncation is not possible, returns `data`.
1210    ///
1211    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
1212    ///
1213    /// UTF-8 Note:
1214    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
1215    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
1216    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
1217    /// binary.
1218    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1219        truncation_length
1220            .filter(|l| data.len() > *l)
1221            .and_then(|l|
1222                // don't do extra work if this column isn't UTF-8
1223                if self.is_utf8() {
1224                    match str::from_utf8(data) {
1225                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
1226                        Err(_) => increment(data[..l].to_vec()),
1227                    }
1228                } else {
1229                    increment(data[..l].to_vec())
1230                }
1231            )
1232            .map(|truncated| (truncated, true))
1233            .unwrap_or_else(|| (data.to_vec(), false))
1234    }
1235
1236    /// Truncate the min and max values that will be written to a data page
1237    /// header or column chunk Statistics
1238    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
1239        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1240        match statistics {
1241            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1242                let (min, did_truncate_min) = self.truncate_min_value(
1243                    self.props.statistics_truncate_length(),
1244                    stats.min_bytes_opt().unwrap(),
1245                );
1246                let (max, did_truncate_max) = self.truncate_max_value(
1247                    self.props.statistics_truncate_length(),
1248                    stats.max_bytes_opt().unwrap(),
1249                );
1250                Statistics::ByteArray(
1251                    ValueStatistics::new(
1252                        Some(min.into()),
1253                        Some(max.into()),
1254                        stats.distinct_count(),
1255                        stats.null_count_opt(),
1256                        backwards_compatible_min_max,
1257                    )
1258                    .with_max_is_exact(!did_truncate_max)
1259                    .with_min_is_exact(!did_truncate_min),
1260                )
1261            }
1262            Statistics::FixedLenByteArray(stats)
1263                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1264            {
1265                let (min, did_truncate_min) = self.truncate_min_value(
1266                    self.props.statistics_truncate_length(),
1267                    stats.min_bytes_opt().unwrap(),
1268                );
1269                let (max, did_truncate_max) = self.truncate_max_value(
1270                    self.props.statistics_truncate_length(),
1271                    stats.max_bytes_opt().unwrap(),
1272                );
1273                Statistics::FixedLenByteArray(
1274                    ValueStatistics::new(
1275                        Some(min.into()),
1276                        Some(max.into()),
1277                        stats.distinct_count(),
1278                        stats.null_count_opt(),
1279                        backwards_compatible_min_max,
1280                    )
1281                    .with_max_is_exact(!did_truncate_max)
1282                    .with_min_is_exact(!did_truncate_min),
1283                )
1284            }
1285            stats => stats,
1286        }
1287    }
1288
1289    /// Adds data page.
1290    /// Data page is either buffered in case of dictionary encoding or written directly.
1291    pub(crate) fn add_data_page(&mut self) -> Result<()> {
1292        // Extract encoded values
1293        let values_data = self.encoder.flush_data_page()?;
1294
1295        let max_def_level = self.descr.max_def_level();
1296        let max_rep_level = self.descr.max_rep_level();
1297
1298        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1299
1300        let page_statistics = match (values_data.min_value, values_data.max_value) {
1301            (Some(min), Some(max)) => {
1302                // Update chunk level statistics
1303                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1304                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1305
1306                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1307                    ValueStatistics::new(
1308                        Some(min),
1309                        Some(max),
1310                        None,
1311                        Some(self.page_metrics.num_page_nulls),
1312                        false,
1313                    ),
1314                )
1315            }
1316            _ => None,
1317        };
1318
1319        // update column and offset index
1320        self.update_column_offset_index(
1321            page_statistics.as_ref(),
1322            values_data.variable_length_bytes,
1323        );
1324
1325        // Update histograms and variable_length_bytes in column_metrics
1326        self.column_metrics
1327            .update_from_page_metrics(&self.page_metrics);
1328        self.column_metrics
1329            .update_variable_length_bytes(values_data.variable_length_bytes);
1330
1331        // From here on, we only need page statistics if they will be written to the page header.
1332        let page_statistics = page_statistics
1333            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1334            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1335
1336        let compressed_page = match self.props.writer_version() {
1337            WriterVersion::PARQUET_1_0 => {
1338                let mut buffer = vec![];
1339
1340                if max_rep_level > 0 {
1341                    self.rep_levels_encoder
1342                        .flush_to(|data| buffer.extend_from_slice(data));
1343                }
1344
1345                if max_def_level > 0 {
1346                    self.def_levels_encoder
1347                        .flush_to(|data| buffer.extend_from_slice(data));
1348                }
1349
1350                buffer.extend_from_slice(&values_data.buf);
1351                let uncompressed_size = buffer.len();
1352
1353                if let Some(ref mut cmpr) = self.compressor {
1354                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1355                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1356                    compressed_buf.shrink_to_fit();
1357                    buffer = compressed_buf;
1358                }
1359
1360                let data_page = Page::DataPage {
1361                    buf: buffer.into(),
1362                    num_values: self.page_metrics.num_buffered_values,
1363                    encoding: values_data.encoding,
1364                    def_level_encoding: Encoding::RLE,
1365                    rep_level_encoding: Encoding::RLE,
1366                    statistics: page_statistics,
1367                };
1368
1369                CompressedPage::new(data_page, uncompressed_size)
1370            }
1371            WriterVersion::PARQUET_2_0 => {
1372                let mut rep_levels_byte_len = 0;
1373                let mut def_levels_byte_len = 0;
1374                let mut buffer = vec![];
1375
1376                if max_rep_level > 0 {
1377                    self.rep_levels_encoder
1378                        .flush_to(|data| buffer.extend_from_slice(data));
1379                    rep_levels_byte_len = buffer.len();
1380                }
1381
1382                if max_def_level > 0 {
1383                    self.def_levels_encoder
1384                        .flush_to(|data| buffer.extend_from_slice(data));
1385                    def_levels_byte_len = buffer.len() - rep_levels_byte_len;
1386                }
1387
1388                let uncompressed_size =
1389                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1390
1391                // Data Page v2 compresses values only.
1392                let is_compressed = match self.compressor {
1393                    Some(ref mut cmpr) => {
1394                        let buffer_len = buffer.len();
1395                        cmpr.compress(&values_data.buf, &mut buffer)?;
1396                        let compressed_values_size = buffer.len() - buffer_len;
1397                        let threshold = self
1398                            .props
1399                            .column_data_page_v2_compression_ratio_threshold(self.descr.path());
1400                        if (compressed_values_size as f64) >= (uncompressed_size as f64) * threshold
1401                        {
1402                            buffer.truncate(buffer_len);
1403                            buffer.extend_from_slice(&values_data.buf);
1404                            false
1405                        } else {
1406                            true
1407                        }
1408                    }
1409                    None => {
1410                        buffer.extend_from_slice(&values_data.buf);
1411                        false
1412                    }
1413                };
1414
1415                let data_page = Page::DataPageV2 {
1416                    buf: buffer.into(),
1417                    num_values: self.page_metrics.num_buffered_values,
1418                    encoding: values_data.encoding,
1419                    num_nulls: self.page_metrics.num_page_nulls as u32,
1420                    num_rows: self.page_metrics.num_buffered_rows,
1421                    def_levels_byte_len: def_levels_byte_len as u32,
1422                    rep_levels_byte_len: rep_levels_byte_len as u32,
1423                    is_compressed,
1424                    statistics: page_statistics,
1425                };
1426
1427                CompressedPage::new(data_page, uncompressed_size)
1428            }
1429        };
1430
1431        // Check if we need to buffer data page or flush it to the sink directly.
1432        //
1433        // For dictionary-encoded columns the dictionary page must be written
1434        // first, but it is not final until all values are seen, so completed
1435        // data pages are normally buffered here until `close`. A page writer
1436        // that defers final layout (the Arrow path) instead orders pages itself
1437        // at flush, so we stream the data pages straight through and never let
1438        // them accumulate in memory.
1439        if self.encoder.has_dictionary() && !self.page_writer.defers_dictionary_ordering() {
1440            self.data_pages.push_back(compressed_page);
1441        } else {
1442            self.write_data_page(compressed_page)?;
1443        }
1444
1445        // Update total number of rows.
1446        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1447        self.page_metrics.new_page();
1448
1449        Ok(())
1450    }
1451
1452    /// Finalises any outstanding data pages and flushes buffered data pages from
1453    /// dictionary encoding into underlying sink.
1454    #[inline]
1455    fn flush_data_pages(&mut self) -> Result<()> {
1456        // Write all outstanding data to a new page.
1457        if self.page_metrics.num_buffered_values > 0 {
1458            self.add_data_page()?;
1459        }
1460
1461        while let Some(page) = self.data_pages.pop_front() {
1462            self.write_data_page(page)?;
1463        }
1464
1465        Ok(())
1466    }
1467
1468    /// Assembles column chunk metadata.
1469    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1470        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1471        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1472        let num_values = self.column_metrics.total_num_values as i64;
1473        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1474        // If data page offset is not set, then no pages have been written
1475        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1476
1477        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1478            .set_compression(self.codec)
1479            .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1480            .set_page_encoding_stats(self.encoding_stats.clone())
1481            .set_total_compressed_size(total_compressed_size)
1482            .set_total_uncompressed_size(total_uncompressed_size)
1483            .set_num_values(num_values)
1484            .set_data_page_offset(data_page_offset)
1485            .set_dictionary_page_offset(dict_page_offset);
1486
1487        if self.statistics_enabled != EnabledStatistics::None {
1488            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1489
1490            let statistics = ValueStatistics::<E::T>::new(
1491                self.column_metrics.min_column_value.clone(),
1492                self.column_metrics.max_column_value.clone(),
1493                self.column_metrics.column_distinct_count,
1494                Some(self.column_metrics.num_column_nulls),
1495                false,
1496            )
1497            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1498            .into();
1499
1500            let statistics = self.truncate_statistics(statistics);
1501
1502            builder = builder
1503                .set_statistics(statistics)
1504                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1505                .set_repetition_level_histogram(
1506                    self.column_metrics.repetition_level_histogram.take(),
1507                )
1508                .set_definition_level_histogram(
1509                    self.column_metrics.definition_level_histogram.take(),
1510                );
1511
1512            if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1513                builder = builder.set_geo_statistics(geo_stats);
1514            }
1515        }
1516
1517        builder = self.set_column_chunk_encryption_properties(builder);
1518
1519        let metadata = builder.build()?;
1520        Ok(metadata)
1521    }
1522
1523    /// Writes compressed data page into underlying sink and updates global metrics.
1524    #[inline]
1525    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1526        self.encodings.insert(page.encoding());
1527        match self.encoding_stats.last_mut() {
1528            Some(encoding_stats)
1529                if encoding_stats.page_type == page.page_type()
1530                    && encoding_stats.encoding == page.encoding() =>
1531            {
1532                encoding_stats.count += 1;
1533            }
1534            _ => {
1535                // data page type does not change inside a file
1536                // encoding can currently only change from dictionary to non-dictionary once
1537                self.encoding_stats.push(PageEncodingStats {
1538                    page_type: page.page_type(),
1539                    encoding: page.encoding(),
1540                    count: 1,
1541                });
1542            }
1543        }
1544        let page_spec = self.page_writer.write_page(page)?;
1545        // update offset index
1546        // compressed_size = header_size + compressed_data_size
1547        if let Some(builder) = self.offset_index_builder.as_mut() {
1548            builder
1549                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1550        }
1551        self.update_metrics_for_page(page_spec);
1552        Ok(())
1553    }
1554
1555    /// Writes dictionary page into underlying sink.
1556    #[inline]
1557    fn write_dictionary_page(&mut self) -> Result<()> {
1558        let compressed_page = {
1559            let mut page = self
1560                .encoder
1561                .flush_dict_page()?
1562                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1563
1564            let uncompressed_size = page.buf.len();
1565
1566            if let Some(ref mut cmpr) = self.compressor {
1567                let mut output_buf = Vec::with_capacity(uncompressed_size);
1568                cmpr.compress(&page.buf, &mut output_buf)?;
1569                page.buf = Bytes::from(output_buf);
1570            }
1571
1572            let dict_page = Page::DictionaryPage {
1573                buf: page.buf,
1574                num_values: page.num_values as u32,
1575                encoding: self.props.dictionary_page_encoding(),
1576                is_sorted: page.is_sorted,
1577            };
1578            CompressedPage::new(dict_page, uncompressed_size)
1579        };
1580
1581        self.encodings.insert(compressed_page.encoding());
1582        self.encoding_stats.push(PageEncodingStats {
1583            page_type: PageType::DICTIONARY_PAGE,
1584            encoding: compressed_page.encoding(),
1585            count: 1,
1586        });
1587        let page_spec = self.page_writer.write_page(compressed_page)?;
1588        self.update_metrics_for_page(page_spec);
1589        // For the directory page, don't need to update column/offset index.
1590        Ok(())
1591    }
1592
1593    /// Updates column writer metrics with each page metadata.
1594    #[inline]
1595    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1596        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1597        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1598        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1599
1600        match page_spec.page_type {
1601            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1602                self.column_metrics.total_num_values += page_spec.num_values as u64;
1603                if self.column_metrics.data_page_offset.is_none() {
1604                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1605                }
1606            }
1607            PageType::DICTIONARY_PAGE => {
1608                assert!(
1609                    self.column_metrics.dictionary_page_offset.is_none(),
1610                    "Dictionary offset is already set"
1611                );
1612                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1613            }
1614            _ => {}
1615        }
1616    }
1617
1618    #[inline]
1619    #[cfg(feature = "encryption")]
1620    fn set_column_chunk_encryption_properties(
1621        &self,
1622        builder: ColumnChunkMetaDataBuilder,
1623    ) -> ColumnChunkMetaDataBuilder {
1624        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1625            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1626                encryption_properties,
1627                &self.descr,
1628            ))
1629        } else {
1630            builder
1631        }
1632    }
1633
1634    #[inline]
1635    #[cfg(not(feature = "encryption"))]
1636    fn set_column_chunk_encryption_properties(
1637        &self,
1638        builder: ColumnChunkMetaDataBuilder,
1639    ) -> ColumnChunkMetaDataBuilder {
1640        builder
1641    }
1642}
1643
1644fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1645    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1646}
1647
1648fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1649    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1650}
1651
1652#[inline]
1653#[allow(clippy::eq_op)]
1654fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1655    match T::PHYSICAL_TYPE {
1656        Type::FLOAT | Type::DOUBLE => val != val,
1657        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1658            let val = val.as_bytes();
1659            let val = f16::from_le_bytes([val[0], val[1]]);
1660            val.is_nan()
1661        }
1662        _ => false,
1663    }
1664}
1665
1666/// Perform a conditional update of `cur`, skipping any NaN values
1667///
1668/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1669/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1670fn update_stat<T: ParquetValueType, F>(
1671    descr: &ColumnDescriptor,
1672    val: &T,
1673    cur: &mut Option<T>,
1674    should_update: F,
1675) where
1676    F: Fn(&T) -> bool,
1677{
1678    if is_nan(descr, val) {
1679        return;
1680    }
1681
1682    if cur.as_ref().is_none_or(should_update) {
1683        *cur = Some(val.clone());
1684    }
1685}
1686
1687/// Evaluate `a > b` according to underlying logical type.
1688fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1689    match T::PHYSICAL_TYPE {
1690        Type::INT32 | Type::INT64 => {
1691            if let Some(LogicalType::Integer(IntType {
1692                is_signed: false, ..
1693            })) = descr.logical_type_ref()
1694            {
1695                // need to compare unsigned
1696                return compare_greater_unsigned_int(a, b);
1697            }
1698
1699            match descr.converted_type() {
1700                ConvertedType::UINT_8
1701                | ConvertedType::UINT_16
1702                | ConvertedType::UINT_32
1703                | ConvertedType::UINT_64 => {
1704                    return compare_greater_unsigned_int(a, b);
1705                }
1706                _ => {}
1707            };
1708        }
1709        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1710            if let Some(LogicalType::Decimal(_)) = descr.logical_type_ref() {
1711                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1712            }
1713            if let ConvertedType::DECIMAL = descr.converted_type() {
1714                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1715            }
1716            if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1717                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1718            }
1719        }
1720
1721        _ => {}
1722    }
1723
1724    // compare independent of logical / converted type
1725    a > b
1726}
1727
1728// ----------------------------------------------------------------------
1729// Encoding support for column writer.
1730// This mirrors parquet-mr default encodings for writes. See:
1731// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1732// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1733
1734/// Returns encoding for a column when no other encoding is provided in writer properties.
1735fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1736    match (kind, props.writer_version()) {
1737        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1738        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1739        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1740        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1741        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1742        _ => Encoding::PLAIN,
1743    }
1744}
1745
1746/// Returns true if dictionary is supported for column writer, false otherwise.
1747fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1748    match (kind, props.writer_version()) {
1749        // Booleans do not support dict encoding and should use a fallback encoding.
1750        (Type::BOOLEAN, _) => false,
1751        // Dictionary encoding was not enabled in PARQUET 1.0
1752        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1753        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1754        _ => true,
1755    }
1756}
1757
1758#[inline]
1759fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1760    a.as_u64().unwrap() > b.as_u64().unwrap()
1761}
1762
1763#[inline]
1764fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1765    let a = f16::from_le_bytes(a.try_into().unwrap());
1766    let b = f16::from_le_bytes(b.try_into().unwrap());
1767    a > b
1768}
1769
1770/// Signed comparison of bytes arrays
1771fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1772    let a_length = a.len();
1773    let b_length = b.len();
1774
1775    if a_length == 0 || b_length == 0 {
1776        return a_length > 0;
1777    }
1778
1779    let first_a: u8 = a[0];
1780    let first_b: u8 = b[0];
1781
1782    // We can short circuit for different signed numbers or
1783    // for equal length bytes arrays that have different first bytes.
1784    // The equality requirement is necessary for sign extension cases.
1785    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1786    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1787        return (first_a as i8) > (first_b as i8);
1788    }
1789
1790    // When the lengths are unequal and the numbers are of the same
1791    // sign we need to do comparison by sign extending the shorter
1792    // value first, and once we get to equal sized arrays, lexicographical
1793    // unsigned comparison of everything but the first byte is sufficient.
1794
1795    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1796
1797    if a_length != b_length {
1798        let not_equal = if a_length > b_length {
1799            let lead_length = a_length - b_length;
1800            a[0..lead_length].iter().any(|&x| x != extension)
1801        } else {
1802            let lead_length = b_length - a_length;
1803            b[0..lead_length].iter().any(|&x| x != extension)
1804        };
1805
1806        if not_equal {
1807            let negative_values: bool = (first_a as i8) < 0;
1808            let a_longer: bool = a_length > b_length;
1809            return if negative_values { !a_longer } else { a_longer };
1810        }
1811    }
1812
1813    (a[1..]) > (b[1..])
1814}
1815
1816/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1817/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1818/// is not possible within those constraints.
1819///
1820/// The caller guarantees that data.len() > length.
1821fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1822    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1823    Some(data.as_bytes()[..split].to_vec())
1824}
1825
1826/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1827/// longest such slice that is still a valid UTF-8 string while being less than `length`
1828/// bytes and non-empty. Returns `None` if no such transformation is possible.
1829///
1830/// The caller guarantees that data.len() > length.
1831fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1832    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1833    let lower_bound = length.saturating_sub(3);
1834    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1835    increment_utf8(data.get(..split)?)
1836}
1837
1838/// Increment the final character in a UTF-8 string in such a way that the returned result
1839/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1840/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1841/// Returns `None` if the string cannot be incremented.
1842///
1843/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1844fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1845    for (idx, original_char) in data.char_indices().rev() {
1846        let original_len = original_char.len_utf8();
1847        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1848            // do not allow increasing byte width of incremented char
1849            if next_char.len_utf8() == original_len {
1850                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1851                next_char.encode_utf8(&mut result[idx..]);
1852                return Some(result);
1853            }
1854        }
1855    }
1856
1857    None
1858}
1859
1860/// Try and increment the bytes from right to left.
1861///
1862/// Returns `None` if all bytes are set to `u8::MAX`.
1863fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1864    for byte in data.iter_mut().rev() {
1865        let (incremented, overflow) = byte.overflowing_add(1);
1866        *byte = incremented;
1867
1868        if !overflow {
1869            return Some(data);
1870        }
1871    }
1872
1873    None
1874}
1875
1876#[cfg(test)]
1877mod tests {
1878    use crate::{
1879        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1880        schema::parser::parse_message_type,
1881    };
1882    use core::str;
1883    use rand::distr::uniform::SampleUniform;
1884    use std::{fs::File, sync::Arc};
1885
1886    use crate::column::{
1887        page::PageReader,
1888        reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1889    };
1890    use crate::file::writer::TrackedWrite;
1891    use crate::file::{
1892        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1893    };
1894    use crate::schema::types::{ColumnPath, Type as SchemaType};
1895    use crate::util::test_common::rand_gen::random_numbers_range;
1896
1897    use super::*;
1898
1899    #[test]
1900    fn test_column_writer_inconsistent_def_rep_length() {
1901        let page_writer = get_test_page_writer();
1902        let props = Default::default();
1903        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1904        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1905        assert!(res.is_err());
1906        if let Err(err) = res {
1907            assert_eq!(
1908                format!("{err}"),
1909                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1910            );
1911        }
1912    }
1913
1914    #[test]
1915    fn test_column_writer_invalid_def_levels() {
1916        let page_writer = get_test_page_writer();
1917        let props = Default::default();
1918        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1919        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1920        assert!(res.is_err());
1921        if let Err(err) = res {
1922            assert_eq!(
1923                format!("{err}"),
1924                "Parquet error: Definition levels are required, because max definition level = 1"
1925            );
1926        }
1927    }
1928
1929    #[test]
1930    fn test_column_writer_invalid_rep_levels() {
1931        let page_writer = get_test_page_writer();
1932        let props = Default::default();
1933        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1934        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1935        assert!(res.is_err());
1936        if let Err(err) = res {
1937            assert_eq!(
1938                format!("{err}"),
1939                "Parquet error: Repetition levels are required, because max repetition level = 1"
1940            );
1941        }
1942    }
1943
1944    #[test]
1945    fn test_column_writer_not_enough_values_to_write() {
1946        let page_writer = get_test_page_writer();
1947        let props = Default::default();
1948        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1949        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1950        assert!(res.is_err());
1951        if let Err(err) = res {
1952            assert_eq!(
1953                format!("{err}"),
1954                "Parquet error: Expected to write 4 values, but have only 2"
1955            );
1956        }
1957    }
1958
1959    #[test]
1960    fn test_column_writer_write_only_one_dictionary_page() {
1961        let page_writer = get_test_page_writer();
1962        let props = Default::default();
1963        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1964        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1965        // First page should be correctly written.
1966        writer.add_data_page().unwrap();
1967        writer.write_dictionary_page().unwrap();
1968        let err = writer.write_dictionary_page().unwrap_err().to_string();
1969        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1970    }
1971
1972    #[test]
1973    fn test_column_writer_error_when_writing_disabled_dictionary() {
1974        let page_writer = get_test_page_writer();
1975        let props = Arc::new(
1976            WriterProperties::builder()
1977                .set_dictionary_enabled(false)
1978                .build(),
1979        );
1980        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1981        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1982        let err = writer.write_dictionary_page().unwrap_err().to_string();
1983        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1984    }
1985
1986    #[test]
1987    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1988        let page_writer = get_test_page_writer();
1989        let props = Arc::new(
1990            WriterProperties::builder()
1991                .set_dictionary_enabled(true)
1992                .build(),
1993        );
1994        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1995        writer
1996            .write_batch(&[true, false, true, false], None, None)
1997            .unwrap();
1998
1999        let r = writer.close().unwrap();
2000        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
2001        // byte.
2002        assert_eq!(r.bytes_written, 1);
2003        assert_eq!(r.rows_written, 4);
2004
2005        let metadata = r.metadata;
2006        assert_eq!(
2007            metadata.encodings().collect::<Vec<_>>(),
2008            vec![Encoding::PLAIN, Encoding::RLE]
2009        );
2010        assert_eq!(metadata.num_values(), 4); // just values
2011        assert_eq!(metadata.dictionary_page_offset(), None);
2012    }
2013
2014    #[test]
2015    fn test_column_writer_default_encoding_support_bool() {
2016        check_encoding_write_support::<BoolType>(
2017            WriterVersion::PARQUET_1_0,
2018            true,
2019            &[true, false],
2020            None,
2021            &[Encoding::PLAIN, Encoding::RLE],
2022            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2023        );
2024        check_encoding_write_support::<BoolType>(
2025            WriterVersion::PARQUET_1_0,
2026            false,
2027            &[true, false],
2028            None,
2029            &[Encoding::PLAIN, Encoding::RLE],
2030            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2031        );
2032        check_encoding_write_support::<BoolType>(
2033            WriterVersion::PARQUET_2_0,
2034            true,
2035            &[true, false],
2036            None,
2037            &[Encoding::RLE],
2038            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
2039        );
2040        check_encoding_write_support::<BoolType>(
2041            WriterVersion::PARQUET_2_0,
2042            false,
2043            &[true, false],
2044            None,
2045            &[Encoding::RLE],
2046            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
2047        );
2048    }
2049
2050    #[test]
2051    fn test_column_writer_default_encoding_support_int32() {
2052        check_encoding_write_support::<Int32Type>(
2053            WriterVersion::PARQUET_1_0,
2054            true,
2055            &[1, 2],
2056            Some(0),
2057            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2058            &[
2059                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2060                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2061            ],
2062        );
2063        check_encoding_write_support::<Int32Type>(
2064            WriterVersion::PARQUET_1_0,
2065            false,
2066            &[1, 2],
2067            None,
2068            &[Encoding::PLAIN, Encoding::RLE],
2069            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2070        );
2071        check_encoding_write_support::<Int32Type>(
2072            WriterVersion::PARQUET_2_0,
2073            true,
2074            &[1, 2],
2075            Some(0),
2076            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2077            &[
2078                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2079                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2080            ],
2081        );
2082        check_encoding_write_support::<Int32Type>(
2083            WriterVersion::PARQUET_2_0,
2084            false,
2085            &[1, 2],
2086            None,
2087            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
2088            &[encoding_stats(
2089                PageType::DATA_PAGE_V2,
2090                Encoding::DELTA_BINARY_PACKED,
2091                1,
2092            )],
2093        );
2094    }
2095
2096    #[test]
2097    fn test_column_writer_default_encoding_support_int64() {
2098        check_encoding_write_support::<Int64Type>(
2099            WriterVersion::PARQUET_1_0,
2100            true,
2101            &[1, 2],
2102            Some(0),
2103            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2104            &[
2105                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2106                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2107            ],
2108        );
2109        check_encoding_write_support::<Int64Type>(
2110            WriterVersion::PARQUET_1_0,
2111            false,
2112            &[1, 2],
2113            None,
2114            &[Encoding::PLAIN, Encoding::RLE],
2115            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2116        );
2117        check_encoding_write_support::<Int64Type>(
2118            WriterVersion::PARQUET_2_0,
2119            true,
2120            &[1, 2],
2121            Some(0),
2122            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2123            &[
2124                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2125                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2126            ],
2127        );
2128        check_encoding_write_support::<Int64Type>(
2129            WriterVersion::PARQUET_2_0,
2130            false,
2131            &[1, 2],
2132            None,
2133            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
2134            &[encoding_stats(
2135                PageType::DATA_PAGE_V2,
2136                Encoding::DELTA_BINARY_PACKED,
2137                1,
2138            )],
2139        );
2140    }
2141
2142    #[test]
2143    fn test_column_writer_default_encoding_support_int96() {
2144        check_encoding_write_support::<Int96Type>(
2145            WriterVersion::PARQUET_1_0,
2146            true,
2147            &[Int96::from(vec![1, 2, 3])],
2148            Some(0),
2149            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2150            &[
2151                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2152                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2153            ],
2154        );
2155        check_encoding_write_support::<Int96Type>(
2156            WriterVersion::PARQUET_1_0,
2157            false,
2158            &[Int96::from(vec![1, 2, 3])],
2159            None,
2160            &[Encoding::PLAIN, Encoding::RLE],
2161            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2162        );
2163        check_encoding_write_support::<Int96Type>(
2164            WriterVersion::PARQUET_2_0,
2165            true,
2166            &[Int96::from(vec![1, 2, 3])],
2167            Some(0),
2168            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2169            &[
2170                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2171                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2172            ],
2173        );
2174        check_encoding_write_support::<Int96Type>(
2175            WriterVersion::PARQUET_2_0,
2176            false,
2177            &[Int96::from(vec![1, 2, 3])],
2178            None,
2179            &[Encoding::PLAIN, Encoding::RLE],
2180            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2181        );
2182    }
2183
2184    #[test]
2185    fn test_column_writer_default_encoding_support_float() {
2186        check_encoding_write_support::<FloatType>(
2187            WriterVersion::PARQUET_1_0,
2188            true,
2189            &[1.0, 2.0],
2190            Some(0),
2191            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2192            &[
2193                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2194                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2195            ],
2196        );
2197        check_encoding_write_support::<FloatType>(
2198            WriterVersion::PARQUET_1_0,
2199            false,
2200            &[1.0, 2.0],
2201            None,
2202            &[Encoding::PLAIN, Encoding::RLE],
2203            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2204        );
2205        check_encoding_write_support::<FloatType>(
2206            WriterVersion::PARQUET_2_0,
2207            true,
2208            &[1.0, 2.0],
2209            Some(0),
2210            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2211            &[
2212                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2213                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2214            ],
2215        );
2216        check_encoding_write_support::<FloatType>(
2217            WriterVersion::PARQUET_2_0,
2218            false,
2219            &[1.0, 2.0],
2220            None,
2221            &[Encoding::PLAIN, Encoding::RLE],
2222            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2223        );
2224    }
2225
2226    #[test]
2227    fn test_column_writer_default_encoding_support_double() {
2228        check_encoding_write_support::<DoubleType>(
2229            WriterVersion::PARQUET_1_0,
2230            true,
2231            &[1.0, 2.0],
2232            Some(0),
2233            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2234            &[
2235                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2236                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2237            ],
2238        );
2239        check_encoding_write_support::<DoubleType>(
2240            WriterVersion::PARQUET_1_0,
2241            false,
2242            &[1.0, 2.0],
2243            None,
2244            &[Encoding::PLAIN, Encoding::RLE],
2245            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2246        );
2247        check_encoding_write_support::<DoubleType>(
2248            WriterVersion::PARQUET_2_0,
2249            true,
2250            &[1.0, 2.0],
2251            Some(0),
2252            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2253            &[
2254                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2255                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2256            ],
2257        );
2258        check_encoding_write_support::<DoubleType>(
2259            WriterVersion::PARQUET_2_0,
2260            false,
2261            &[1.0, 2.0],
2262            None,
2263            &[Encoding::PLAIN, Encoding::RLE],
2264            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2265        );
2266    }
2267
2268    #[test]
2269    fn test_column_writer_default_encoding_support_byte_array() {
2270        check_encoding_write_support::<ByteArrayType>(
2271            WriterVersion::PARQUET_1_0,
2272            true,
2273            &[ByteArray::from(vec![1u8])],
2274            Some(0),
2275            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2276            &[
2277                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2278                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2279            ],
2280        );
2281        check_encoding_write_support::<ByteArrayType>(
2282            WriterVersion::PARQUET_1_0,
2283            false,
2284            &[ByteArray::from(vec![1u8])],
2285            None,
2286            &[Encoding::PLAIN, Encoding::RLE],
2287            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2288        );
2289        check_encoding_write_support::<ByteArrayType>(
2290            WriterVersion::PARQUET_2_0,
2291            true,
2292            &[ByteArray::from(vec![1u8])],
2293            Some(0),
2294            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2295            &[
2296                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2297                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2298            ],
2299        );
2300        check_encoding_write_support::<ByteArrayType>(
2301            WriterVersion::PARQUET_2_0,
2302            false,
2303            &[ByteArray::from(vec![1u8])],
2304            None,
2305            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2306            &[encoding_stats(
2307                PageType::DATA_PAGE_V2,
2308                Encoding::DELTA_BYTE_ARRAY,
2309                1,
2310            )],
2311        );
2312    }
2313
2314    #[test]
2315    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2316        check_encoding_write_support::<FixedLenByteArrayType>(
2317            WriterVersion::PARQUET_1_0,
2318            true,
2319            &[ByteArray::from(vec![1u8]).into()],
2320            None,
2321            &[Encoding::PLAIN, Encoding::RLE],
2322            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2323        );
2324        check_encoding_write_support::<FixedLenByteArrayType>(
2325            WriterVersion::PARQUET_1_0,
2326            false,
2327            &[ByteArray::from(vec![1u8]).into()],
2328            None,
2329            &[Encoding::PLAIN, Encoding::RLE],
2330            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2331        );
2332        check_encoding_write_support::<FixedLenByteArrayType>(
2333            WriterVersion::PARQUET_2_0,
2334            true,
2335            &[ByteArray::from(vec![1u8]).into()],
2336            Some(0),
2337            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2338            &[
2339                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2340                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2341            ],
2342        );
2343        check_encoding_write_support::<FixedLenByteArrayType>(
2344            WriterVersion::PARQUET_2_0,
2345            false,
2346            &[ByteArray::from(vec![1u8]).into()],
2347            None,
2348            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2349            &[encoding_stats(
2350                PageType::DATA_PAGE_V2,
2351                Encoding::DELTA_BYTE_ARRAY,
2352                1,
2353            )],
2354        );
2355    }
2356
2357    #[test]
2358    fn test_column_writer_check_metadata() {
2359        let page_writer = get_test_page_writer();
2360        let props = Default::default();
2361        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2362        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2363
2364        let r = writer.close().unwrap();
2365        assert_eq!(r.bytes_written, 20);
2366        assert_eq!(r.rows_written, 4);
2367
2368        let metadata = r.metadata;
2369        assert_eq!(
2370            metadata.encodings().collect::<Vec<_>>(),
2371            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2372        );
2373        assert_eq!(metadata.num_values(), 4);
2374        assert_eq!(metadata.compressed_size(), 20);
2375        assert_eq!(metadata.uncompressed_size(), 20);
2376        assert_eq!(metadata.data_page_offset(), 0);
2377        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2378        if let Some(stats) = metadata.statistics() {
2379            assert_eq!(stats.null_count_opt(), Some(0));
2380            assert_eq!(stats.distinct_count_opt(), None);
2381            if let Statistics::Int32(stats) = stats {
2382                assert_eq!(stats.min_opt().unwrap(), &1);
2383                assert_eq!(stats.max_opt().unwrap(), &4);
2384            } else {
2385                panic!("expecting Statistics::Int32");
2386            }
2387        } else {
2388            panic!("metadata missing statistics");
2389        }
2390    }
2391
2392    #[test]
2393    fn test_column_writer_check_byte_array_min_max() {
2394        let page_writer = get_test_page_writer();
2395        let props = Default::default();
2396        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2397        writer
2398            .write_batch(
2399                &[
2400                    ByteArray::from(vec![
2401                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2402                        35u8, 231u8, 90u8, 0u8, 0u8,
2403                    ]),
2404                    ByteArray::from(vec![
2405                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2406                        152u8, 177u8, 56u8, 0u8, 0u8,
2407                    ]),
2408                    ByteArray::from(vec![
2409                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2410                        0u8,
2411                    ]),
2412                    ByteArray::from(vec![
2413                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2414                        44u8, 0u8, 0u8,
2415                    ]),
2416                ],
2417                None,
2418                None,
2419            )
2420            .unwrap();
2421        let metadata = writer.close().unwrap().metadata;
2422        if let Some(stats) = metadata.statistics() {
2423            if let Statistics::ByteArray(stats) = stats {
2424                assert_eq!(
2425                    stats.min_opt().unwrap(),
2426                    &ByteArray::from(vec![
2427                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2428                        35u8, 231u8, 90u8, 0u8, 0u8,
2429                    ])
2430                );
2431                assert_eq!(
2432                    stats.max_opt().unwrap(),
2433                    &ByteArray::from(vec![
2434                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2435                        44u8, 0u8, 0u8,
2436                    ])
2437                );
2438            } else {
2439                panic!("expecting Statistics::ByteArray");
2440            }
2441        } else {
2442            panic!("metadata missing statistics");
2443        }
2444    }
2445
2446    #[test]
2447    fn test_column_writer_uint32_converted_type_min_max() {
2448        let page_writer = get_test_page_writer();
2449        let props = Default::default();
2450        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2451            page_writer,
2452            0,
2453            0,
2454            props,
2455        );
2456        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2457        let metadata = writer.close().unwrap().metadata;
2458        if let Some(stats) = metadata.statistics() {
2459            if let Statistics::Int32(stats) = stats {
2460                assert_eq!(stats.min_opt().unwrap(), &0,);
2461                assert_eq!(stats.max_opt().unwrap(), &5,);
2462            } else {
2463                panic!("expecting Statistics::Int32");
2464            }
2465        } else {
2466            panic!("metadata missing statistics");
2467        }
2468    }
2469
2470    #[test]
2471    fn test_column_writer_precalculated_statistics() {
2472        let page_writer = get_test_page_writer();
2473        let props = Arc::new(
2474            WriterProperties::builder()
2475                .set_statistics_enabled(EnabledStatistics::Chunk)
2476                .build(),
2477        );
2478        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2479        writer
2480            .write_batch_with_statistics(
2481                &[1, 2, 3, 4],
2482                None,
2483                None,
2484                Some(&-17),
2485                Some(&9000),
2486                Some(55),
2487            )
2488            .unwrap();
2489
2490        let r = writer.close().unwrap();
2491        assert_eq!(r.bytes_written, 20);
2492        assert_eq!(r.rows_written, 4);
2493
2494        let metadata = r.metadata;
2495        assert_eq!(
2496            metadata.encodings().collect::<Vec<_>>(),
2497            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2498        );
2499        assert_eq!(metadata.num_values(), 4);
2500        assert_eq!(metadata.compressed_size(), 20);
2501        assert_eq!(metadata.uncompressed_size(), 20);
2502        assert_eq!(metadata.data_page_offset(), 0);
2503        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2504        if let Some(stats) = metadata.statistics() {
2505            assert_eq!(stats.null_count_opt(), Some(0));
2506            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2507            if let Statistics::Int32(stats) = stats {
2508                assert_eq!(stats.min_opt().unwrap(), &-17);
2509                assert_eq!(stats.max_opt().unwrap(), &9000);
2510            } else {
2511                panic!("expecting Statistics::Int32");
2512            }
2513        } else {
2514            panic!("metadata missing statistics");
2515        }
2516    }
2517
2518    #[test]
2519    fn test_mixed_precomputed_statistics() {
2520        let mut buf = Vec::with_capacity(100);
2521        let mut write = TrackedWrite::new(&mut buf);
2522        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2523        let props = Arc::new(
2524            WriterProperties::builder()
2525                .set_write_page_header_statistics(true)
2526                .build(),
2527        );
2528        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2529
2530        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2531        writer
2532            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2533            .unwrap();
2534
2535        let r = writer.close().unwrap();
2536
2537        let stats = r.metadata.statistics().unwrap();
2538        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2539        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2540        assert_eq!(stats.null_count_opt(), Some(0));
2541        assert!(stats.distinct_count_opt().is_none());
2542
2543        drop(write);
2544
2545        let props = ReaderProperties::builder()
2546            .set_backward_compatible_lz4(false)
2547            .set_read_page_statistics(true)
2548            .build();
2549        let reader = SerializedPageReader::new_with_properties(
2550            Arc::new(Bytes::from(buf)),
2551            &r.metadata,
2552            r.rows_written as usize,
2553            None,
2554            Arc::new(props),
2555        )
2556        .unwrap();
2557
2558        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2559        assert_eq!(pages.len(), 2);
2560
2561        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2562        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2563
2564        let page_statistics = pages[1].statistics().unwrap();
2565        assert_eq!(
2566            page_statistics.min_bytes_opt().unwrap(),
2567            1_i32.to_le_bytes()
2568        );
2569        assert_eq!(
2570            page_statistics.max_bytes_opt().unwrap(),
2571            7_i32.to_le_bytes()
2572        );
2573        assert_eq!(page_statistics.null_count_opt(), Some(0));
2574        assert!(page_statistics.distinct_count_opt().is_none());
2575    }
2576
2577    #[test]
2578    fn test_disabled_statistics() {
2579        let mut buf = Vec::with_capacity(100);
2580        let mut write = TrackedWrite::new(&mut buf);
2581        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2582        let props = WriterProperties::builder()
2583            .set_statistics_enabled(EnabledStatistics::None)
2584            .set_writer_version(WriterVersion::PARQUET_2_0)
2585            .build();
2586        let props = Arc::new(props);
2587
2588        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2589        writer
2590            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2591            .unwrap();
2592
2593        let r = writer.close().unwrap();
2594        assert!(r.metadata.statistics().is_none());
2595
2596        drop(write);
2597
2598        let props = ReaderProperties::builder()
2599            .set_backward_compatible_lz4(false)
2600            .build();
2601        let reader = SerializedPageReader::new_with_properties(
2602            Arc::new(Bytes::from(buf)),
2603            &r.metadata,
2604            r.rows_written as usize,
2605            None,
2606            Arc::new(props),
2607        )
2608        .unwrap();
2609
2610        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2611        assert_eq!(pages.len(), 2);
2612
2613        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2614        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2615
2616        match &pages[1] {
2617            Page::DataPageV2 {
2618                num_values,
2619                num_nulls,
2620                num_rows,
2621                statistics,
2622                ..
2623            } => {
2624                assert_eq!(*num_values, 6);
2625                assert_eq!(*num_nulls, 2);
2626                assert_eq!(*num_rows, 6);
2627                assert!(statistics.is_none());
2628            }
2629            _ => unreachable!(),
2630        }
2631    }
2632
2633    #[test]
2634    fn test_column_writer_empty_column_roundtrip() {
2635        let props = Default::default();
2636        column_roundtrip::<Int32Type>(props, &[], None, None);
2637    }
2638
2639    #[test]
2640    fn test_column_writer_non_nullable_values_roundtrip() {
2641        let props = Default::default();
2642        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2643    }
2644
2645    #[test]
2646    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2647        let props = Default::default();
2648        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2649    }
2650
2651    #[test]
2652    fn test_column_writer_nullable_repeated_values_roundtrip() {
2653        let props = Default::default();
2654        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2655    }
2656
2657    #[test]
2658    fn test_column_writer_dictionary_fallback_small_data_page() {
2659        let props = WriterProperties::builder()
2660            .set_dictionary_page_size_limit(32)
2661            .set_data_page_size_limit(32)
2662            .build();
2663        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2664    }
2665
2666    #[test]
2667    fn test_column_writer_small_write_batch_size() {
2668        for i in &[1usize, 2, 5, 10, 11, 1023] {
2669            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2670
2671            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2672        }
2673    }
2674
2675    #[test]
2676    fn test_column_writer_dictionary_disabled_v1() {
2677        let props = WriterProperties::builder()
2678            .set_writer_version(WriterVersion::PARQUET_1_0)
2679            .set_dictionary_enabled(false)
2680            .build();
2681        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2682    }
2683
2684    #[test]
2685    fn test_column_writer_dictionary_disabled_v2() {
2686        let props = WriterProperties::builder()
2687            .set_writer_version(WriterVersion::PARQUET_2_0)
2688            .set_dictionary_enabled(false)
2689            .build();
2690        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2691    }
2692
2693    #[test]
2694    fn test_column_writer_compression_v1() {
2695        let props = WriterProperties::builder()
2696            .set_writer_version(WriterVersion::PARQUET_1_0)
2697            .set_compression(Compression::SNAPPY)
2698            .build();
2699        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2700    }
2701
2702    #[test]
2703    fn test_column_writer_compression_v2() {
2704        let props = WriterProperties::builder()
2705            .set_writer_version(WriterVersion::PARQUET_2_0)
2706            .set_compression(Compression::SNAPPY)
2707            .build();
2708        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2709    }
2710
2711    #[test]
2712    fn test_column_writer_v2_compression_ratio_threshold() {
2713        fn write_v2_page(threshold: f64) -> bool {
2714            let mut buf = Vec::with_capacity(4096);
2715            let mut write = TrackedWrite::new(&mut buf);
2716            let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2717            let props = Arc::new(
2718                WriterProperties::builder()
2719                    .set_writer_version(WriterVersion::PARQUET_2_0)
2720                    .set_compression(Compression::SNAPPY)
2721                    .set_dictionary_enabled(false)
2722                    .set_data_page_v2_compression_ratio_threshold(threshold)
2723                    .build(),
2724            );
2725
2726            let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2727            let values: Vec<i32> = vec![42; 4096];
2728            writer.write_batch(&values, None, None).unwrap();
2729            let r = writer.close().unwrap();
2730            drop(write);
2731
2732            let reader_props = ReaderProperties::builder()
2733                .set_backward_compatible_lz4(false)
2734                .build();
2735            let reader = SerializedPageReader::new_with_properties(
2736                Arc::new(Bytes::from(buf)),
2737                &r.metadata,
2738                r.rows_written as usize,
2739                None,
2740                Arc::new(reader_props),
2741            )
2742            .unwrap();
2743            let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2744            let data_page = pages
2745                .iter()
2746                .find(|p| p.page_type() == PageType::DATA_PAGE_V2)
2747                .expect("expected a v2 data page");
2748            match data_page {
2749                Page::DataPageV2 { is_compressed, .. } => *is_compressed,
2750                _ => unreachable!(),
2751            }
2752        }
2753
2754        // Default threshold keeps the compressed buffer for constant data.
2755        assert!(write_v2_page(1.0));
2756        // A strict threshold (require >1000x reduction) discards it.
2757        assert!(!write_v2_page(0.001));
2758    }
2759
2760    #[test]
2761    fn test_column_writer_add_data_pages_with_dict() {
2762        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2763        // and no fallback occurred so far.
2764        let mut file = tempfile::tempfile().unwrap();
2765        let mut write = TrackedWrite::new(&mut file);
2766        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2767        let props = Arc::new(
2768            WriterProperties::builder()
2769                .set_data_page_size_limit(10)
2770                .set_write_batch_size(3) // write 3 values at a time
2771                .build(),
2772        );
2773        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2774        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2775        writer.write_batch(data, None, None).unwrap();
2776        let r = writer.close().unwrap();
2777
2778        drop(write);
2779
2780        // Read pages and check the sequence
2781        let props = ReaderProperties::builder()
2782            .set_backward_compatible_lz4(false)
2783            .build();
2784        let mut page_reader = Box::new(
2785            SerializedPageReader::new_with_properties(
2786                Arc::new(file),
2787                &r.metadata,
2788                r.rows_written as usize,
2789                None,
2790                Arc::new(props),
2791            )
2792            .unwrap(),
2793        );
2794        let mut res = Vec::new();
2795        while let Some(page) = page_reader.get_next_page().unwrap() {
2796            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2797        }
2798        assert_eq!(
2799            res,
2800            vec![
2801                (PageType::DICTIONARY_PAGE, 10, 40),
2802                (PageType::DATA_PAGE, 9, 10),
2803                (PageType::DATA_PAGE, 1, 3),
2804            ]
2805        );
2806        assert_eq!(
2807            r.metadata.page_encoding_stats(),
2808            Some(&vec![
2809                PageEncodingStats {
2810                    page_type: PageType::DICTIONARY_PAGE,
2811                    encoding: Encoding::PLAIN,
2812                    count: 1
2813                },
2814                PageEncodingStats {
2815                    page_type: PageType::DATA_PAGE,
2816                    encoding: Encoding::RLE_DICTIONARY,
2817                    count: 2,
2818                }
2819            ])
2820        );
2821    }
2822
2823    #[test]
2824    fn test_column_writer_column_data_page_size_limit() {
2825        let props = Arc::new(
2826            WriterProperties::builder()
2827                .set_writer_version(WriterVersion::PARQUET_1_0)
2828                .set_dictionary_enabled(false)
2829                .set_data_page_size_limit(1000)
2830                .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2831                .set_write_batch_size(3)
2832                .build(),
2833        );
2834        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2835
2836        let col_values =
2837            write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2838        let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2839
2840        assert_eq!(col_values, vec![3, 3, 3, 1]);
2841        assert_eq!(other_values, vec![10]);
2842    }
2843
2844    #[test]
2845    fn test_column_writer_caps_page_size_for_large_byte_array_values() {
2846        // Regression: the post-write data page byte limit check only fires
2847        // at mini-batch boundaries, so a 1024-row mini-batch of multi-MiB
2848        // BYTE_ARRAY values used to buffer multiple GiB into a single page
2849        // before the limit was even consulted. With the threshold-based
2850        // granular mode this batch should split into ~one page per value.
2851        let value_size = 64 * 1024; // 64 KiB per value
2852        let page_byte_limit = 16 * 1024; // 16 KiB page limit
2853        let num_rows = 64;
2854
2855        let props = WriterProperties::builder()
2856            .set_writer_version(WriterVersion::PARQUET_1_0)
2857            .set_dictionary_enabled(false)
2858            .set_encoding(Encoding::PLAIN)
2859            .set_data_page_size_limit(page_byte_limit)
2860            // Default write_batch_size (1024) — without the fix this
2861            // buffers the entire input into a single ~4 MiB page.
2862            .build();
2863
2864        let data: Vec<_> = (0..num_rows)
2865            .map(|i| ByteArray::from(vec![i as u8; value_size]))
2866            .collect();
2867        let pages = write_and_collect_pages::<ByteArrayType>(props, 0, 0, &data, None, None);
2868
2869        // Every value must end up somewhere.
2870        let total_values: u32 = pages.data_pages.iter().map(|(_, n)| n).sum();
2871        assert_eq!(total_values as usize, num_rows);
2872        // Without the fix this assertion fired with one ~4 MiB page; the
2873        // threshold splits the input so that no page holds more than a
2874        // single oversized value's worth of bytes.
2875        assert!(
2876            pages.data_pages.len() >= num_rows / 2,
2877            "expected pages to be cut close to one per value, got {:?}",
2878            pages.data_pages,
2879        );
2880        // Each page must be bounded by roughly one value's worth of bytes;
2881        // parquet allows a single oversized value to occupy a page by
2882        // itself but never lets us pile many of them together.
2883        for (size, _) in &pages.data_pages {
2884            assert!(
2885                *size <= value_size + 64,
2886                "page size {size} exceeds one-value bound ({}B) — pages {:?}",
2887                value_size + 64,
2888                pages.data_pages,
2889            );
2890        }
2891    }
2892
2893    #[test]
2894    fn test_column_writer_caps_page_size_for_large_values_in_list() {
2895        // Coverage for the Materialized-rep branch of
2896        // `write_granular_chunk`. The flat-column regression test
2897        // exercises the per-level step; this exercises the
2898        // record-by-record step used when rep levels are present.
2899        //
2900        // Column is `list<required binary>` (max_def = 1, max_rep = 1)
2901        // with 3 records of 3 large blobs each. The page byte limit is
2902        // smaller than a single blob, so granular mode kicks in, and the
2903        // Materialized-rep arm of `write_granular_chunk` steps from one
2904        // `rep == 0` boundary to the next so a record never spans pages.
2905        let value_size = 32 * 1024;
2906        let page_byte_limit = 16 * 1024;
2907        let values_per_record = 3;
2908        let num_records = 3;
2909        let num_values = values_per_record * num_records;
2910
2911        // rep levels: 0, 1, 1, 0, 1, 1, 0, 1, 1
2912        let mut rep_levels = Vec::with_capacity(num_values);
2913        for _ in 0..num_records {
2914            rep_levels.push(0i16);
2915            rep_levels.extend(std::iter::repeat_n(1i16, values_per_record - 1));
2916        }
2917        let def_levels = vec![1i16; num_values];
2918
2919        let props = WriterProperties::builder()
2920            .set_writer_version(WriterVersion::PARQUET_1_0)
2921            .set_dictionary_enabled(false)
2922            .set_encoding(Encoding::PLAIN)
2923            .set_data_page_size_limit(page_byte_limit)
2924            .build();
2925
2926        let data: Vec<_> = (0..num_values)
2927            .map(|i| ByteArray::from(vec![i as u8; value_size]))
2928            .collect();
2929        let pages = write_and_collect_pages::<ByteArrayType>(
2930            props,
2931            1,
2932            1,
2933            &data,
2934            Some(&def_levels),
2935            Some(&rep_levels),
2936        );
2937        let data_pages = pages.data_pages;
2938
2939        // The Materialized-rep arm groups levels by record, and each
2940        // record's bytes blow the page byte limit on its own, so we get
2941        // exactly one page per record.
2942        assert_eq!(
2943            data_pages.len(),
2944            num_records,
2945            "expected one data page per record, got {data_pages:?}"
2946        );
2947        for (bytes, n_values) in &data_pages {
2948            assert_eq!(
2949                *n_values as usize, values_per_record,
2950                "each page must hold a whole record's leaves, got {data_pages:?}"
2951            );
2952            // Each page is one full record (its leaves cannot be split),
2953            // so allow up to `values_per_record` blobs of payload plus a
2954            // small fudge for level encoding overhead.
2955            let upper_bound = values_per_record * (value_size + 16);
2956            assert!(
2957                *bytes <= upper_bound,
2958                "page size {bytes} exceeds whole-record bound ({upper_bound}); pages {data_pages:?}"
2959            );
2960        }
2961    }
2962
2963    #[test]
2964    fn test_column_writer_caps_page_size_with_nullable_large_values() {
2965        // Coverage for `LevelDataRef::value_count` on Materialized def
2966        // levels: a nullable column with mixed nulls and large values.
2967        // `value_count` must return the actual non-null count so the
2968        // byte estimate reflects bytes that will actually be written,
2969        // not the level count.
2970        let value_size = 32 * 1024;
2971        let page_byte_limit = 16 * 1024;
2972        let num_levels = 32;
2973
2974        // Alternating null / non-null: 16 nulls and 16 values.
2975        let def_levels: Vec<i16> = (0..num_levels as i16).map(|i| i % 2).collect();
2976        let num_values = def_levels.iter().filter(|&&d| d == 1).count();
2977
2978        let props = WriterProperties::builder()
2979            .set_writer_version(WriterVersion::PARQUET_1_0)
2980            .set_dictionary_enabled(false)
2981            .set_encoding(Encoding::PLAIN)
2982            .set_data_page_size_limit(page_byte_limit)
2983            .build();
2984
2985        let data: Vec<_> = (0..num_values)
2986            .map(|i| ByteArray::from(vec![i as u8; value_size]))
2987            .collect();
2988        let pages =
2989            write_and_collect_pages::<ByteArrayType>(props, 1, 0, &data, Some(&def_levels), None);
2990        let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect();
2991
2992        // With 16 actual values of 32 KiB each and a 16 KiB page limit,
2993        // every non-null value should get its own page (plus possibly
2994        // adjacent nulls). At minimum, the number of pages must be
2995        // roughly the value count, not 1 (which is what `main` produced).
2996        assert!(
2997            data_pages.len() >= num_values / 2,
2998            "expected at least {} pages for {num_values} large values, got {} pages: {data_pages:?}",
2999            num_values / 2,
3000            data_pages.len(),
3001        );
3002        // No page contains more than ~one value's worth of payload bytes.
3003        for size in &data_pages {
3004            assert!(
3005                *size <= value_size + 64,
3006                "page size {size} exceeds one-value bound; pages {data_pages:?}"
3007            );
3008        }
3009    }
3010
3011    #[test]
3012    fn test_column_writer_dict_enabled_large_values_post_spill() {
3013        // While dictionary encoding is active, `has_dictionary()` short-
3014        // circuits `estimated_value_bytes` — the byte estimate is plain-
3015        // encoded size but dict-encoded pages only store small RLE
3016        // indices, so we'd otherwise shrink pages spuriously. Once the
3017        // dictionary spills (each value is large + unique), plain
3018        // encoding takes over and the byte-budget sub-batch kicks in.
3019        //
3020        // This test makes sure the writer survives that transition and
3021        // produces bounded pages thereafter.
3022        let value_size = 64 * 1024;
3023        let page_byte_limit = 16 * 1024;
3024        let num_rows = 32;
3025
3026        let props = WriterProperties::builder()
3027            .set_writer_version(WriterVersion::PARQUET_1_0)
3028            .set_dictionary_enabled(true)
3029            // Force a small dict so it spills quickly even though
3030            // each value here is unique.
3031            .set_dictionary_page_size_limit(1024)
3032            .set_data_page_size_limit(page_byte_limit)
3033            // Small mini-batches so dict fallback happens part-way
3034            // through the input, leaving subsequent mini-batches to
3035            // exercise the post-spill plain-encoding path that the
3036            // page-size fix actually targets.
3037            .set_write_batch_size(4)
3038            .build();
3039
3040        let data: Vec<_> = (0..num_rows)
3041            .map(|i| ByteArray::from(vec![i as u8; value_size]))
3042            .collect();
3043        let pages = write_and_collect_pages::<ByteArrayType>(props, 0, 0, &data, None, None);
3044        let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect();
3045
3046        // After spill, plain encoding writes one ~64 KiB value per page.
3047        // Without the fix, post-spill writes still buffered all 32
3048        // values into a single ~2 MiB page.
3049        assert!(
3050            data_pages.len() >= num_rows / 2,
3051            "expected >= {} data pages after dict spill, got {} ({data_pages:?})",
3052            num_rows / 2,
3053            data_pages.len(),
3054        );
3055        for size in &data_pages {
3056            assert!(
3057                *size <= value_size + 64,
3058                "page size {size} exceeds one-value bound; pages {data_pages:?}"
3059            );
3060        }
3061    }
3062
3063    #[test]
3064    fn test_column_writer_caps_dictionary_page_size() {
3065        // A column of large *distinct* values with dictionary encoding on:
3066        // the dictionary page accumulates the values themselves, and its
3067        // spill check runs only once per mini-batch. Without bounding the
3068        // dictionary-encoding mini-batch, one `write_batch_size` mini-batch
3069        // would intern `write_batch_size * value_size` bytes into the
3070        // dictionary page before the check fires (~16 MiB here). The chunker
3071        // must sub-batch the dictionary-encoding phase too.
3072        let value_size = 8 * 1024;
3073        let dict_page_limit = 64 * 1024;
3074        let num_rows = 2048;
3075
3076        let props = WriterProperties::builder()
3077            .set_writer_version(WriterVersion::PARQUET_1_0)
3078            .set_dictionary_enabled(true)
3079            .set_dictionary_page_size_limit(dict_page_limit)
3080            .build();
3081
3082        let data: Vec<_> = (0..num_rows)
3083            .map(|i| {
3084                // each value distinct, so the dictionary cannot dedup them
3085                let mut v = vec![0u8; value_size];
3086                v[..8].copy_from_slice(&(i as u64).to_le_bytes());
3087                ByteArray::from(v)
3088            })
3089            .collect();
3090        let pages = write_and_collect_pages::<ByteArrayType>(props, 0, 0, &data, None, None);
3091        let dict_page_size = pages.dict_page_size;
3092
3093        assert!(
3094            dict_page_size > 0,
3095            "expected the column to dictionary-encode"
3096        );
3097        // Bounded near the limit (~2x from the post-mini-batch check). Before
3098        // the fix the dictionary page reached num_rows * value_size (~16 MiB,
3099        // 256x the limit).
3100        assert!(
3101            dict_page_size <= 3 * dict_page_limit,
3102            "dictionary page {dict_page_size} exceeds 3x the {dict_page_limit} limit",
3103        );
3104    }
3105
3106    #[test]
3107    fn test_column_writer_caps_page_size_for_fixed_len_byte_array() {
3108        // Coverage for `ParquetValueType::byte_size` override on
3109        // `FixedLenByteArray`. With `type_length = 1`, each plain-encoded
3110        // value is one byte, so a 4-byte page byte limit forces the
3111        // sub-batch sizer to write ~4 values per page rather than one
3112        // page for the whole batch.
3113        let page_byte_limit = 4;
3114        let num_values = 128;
3115
3116        let props = WriterProperties::builder()
3117            .set_writer_version(WriterVersion::PARQUET_1_0)
3118            .set_dictionary_enabled(false)
3119            .set_encoding(Encoding::PLAIN)
3120            .set_data_page_size_limit(page_byte_limit)
3121            .build();
3122
3123        let data: Vec<_> = (0..num_values)
3124            .map(|i| {
3125                let mut fla = FixedLenByteArray::default();
3126                fla.set_data(Bytes::from(vec![i as u8]));
3127                fla
3128            })
3129            .collect();
3130        let pages =
3131            write_and_collect_pages::<FixedLenByteArrayType>(props, 0, 0, &data, None, None);
3132        let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect();
3133
3134        // Without the fix this is a single 128-byte page; with the fix
3135        // the byte budget caps each page at ~`page_byte_limit` bytes.
3136        assert!(
3137            data_pages.len() >= num_values / 8,
3138            "expected pages capped by byte budget, got {data_pages:?}"
3139        );
3140        for size in &data_pages {
3141            assert!(
3142                *size <= page_byte_limit * 4,
3143                "page size {size} larger than expected; pages {data_pages:?}"
3144            );
3145        }
3146    }
3147
3148    #[test]
3149    fn test_bool_statistics() {
3150        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
3151        // Booleans have an unsigned sort order and so are not compatible
3152        // with the deprecated `min` and `max` statistics
3153        assert!(!stats.is_min_max_backwards_compatible());
3154        if let Statistics::Boolean(stats) = stats {
3155            assert_eq!(stats.min_opt().unwrap(), &false);
3156            assert_eq!(stats.max_opt().unwrap(), &true);
3157        } else {
3158            panic!("expecting Statistics::Boolean, got {stats:?}");
3159        }
3160    }
3161
3162    #[test]
3163    fn test_int32_statistics() {
3164        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
3165        assert!(stats.is_min_max_backwards_compatible());
3166        if let Statistics::Int32(stats) = stats {
3167            assert_eq!(stats.min_opt().unwrap(), &-2);
3168            assert_eq!(stats.max_opt().unwrap(), &3);
3169        } else {
3170            panic!("expecting Statistics::Int32, got {stats:?}");
3171        }
3172    }
3173
3174    #[test]
3175    fn test_int64_statistics() {
3176        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
3177        assert!(stats.is_min_max_backwards_compatible());
3178        if let Statistics::Int64(stats) = stats {
3179            assert_eq!(stats.min_opt().unwrap(), &-2);
3180            assert_eq!(stats.max_opt().unwrap(), &3);
3181        } else {
3182            panic!("expecting Statistics::Int64, got {stats:?}");
3183        }
3184    }
3185
3186    #[test]
3187    fn test_int96_statistics() {
3188        let input = vec![
3189            Int96::from(vec![1, 20, 30]),
3190            Int96::from(vec![3, 20, 10]),
3191            Int96::from(vec![0, 20, 30]),
3192            Int96::from(vec![2, 20, 30]),
3193        ]
3194        .into_iter()
3195        .collect::<Vec<Int96>>();
3196
3197        let stats = statistics_roundtrip::<Int96Type>(&input);
3198        assert!(!stats.is_min_max_backwards_compatible());
3199        if let Statistics::Int96(stats) = stats {
3200            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
3201            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
3202        } else {
3203            panic!("expecting Statistics::Int96, got {stats:?}");
3204        }
3205    }
3206
3207    #[test]
3208    fn test_float_statistics() {
3209        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
3210        assert!(stats.is_min_max_backwards_compatible());
3211        if let Statistics::Float(stats) = stats {
3212            assert_eq!(stats.min_opt().unwrap(), &-2.0);
3213            assert_eq!(stats.max_opt().unwrap(), &3.0);
3214        } else {
3215            panic!("expecting Statistics::Float, got {stats:?}");
3216        }
3217    }
3218
3219    #[test]
3220    fn test_double_statistics() {
3221        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
3222        assert!(stats.is_min_max_backwards_compatible());
3223        if let Statistics::Double(stats) = stats {
3224            assert_eq!(stats.min_opt().unwrap(), &-2.0);
3225            assert_eq!(stats.max_opt().unwrap(), &3.0);
3226        } else {
3227            panic!("expecting Statistics::Double, got {stats:?}");
3228        }
3229    }
3230
3231    #[test]
3232    fn test_byte_array_statistics() {
3233        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
3234            .iter()
3235            .map(|&s| s.into())
3236            .collect::<Vec<_>>();
3237
3238        let stats = statistics_roundtrip::<ByteArrayType>(&input);
3239        assert!(!stats.is_min_max_backwards_compatible());
3240        if let Statistics::ByteArray(stats) = stats {
3241            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
3242            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
3243        } else {
3244            panic!("expecting Statistics::ByteArray, got {stats:?}");
3245        }
3246    }
3247
3248    #[test]
3249    fn test_fixed_len_byte_array_statistics() {
3250        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
3251            .iter()
3252            .map(|&s| ByteArray::from(s).into())
3253            .collect::<Vec<_>>();
3254
3255        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
3256        assert!(!stats.is_min_max_backwards_compatible());
3257        if let Statistics::FixedLenByteArray(stats) = stats {
3258            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
3259            assert_eq!(stats.min_opt().unwrap(), &expected_min);
3260            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
3261            assert_eq!(stats.max_opt().unwrap(), &expected_max);
3262        } else {
3263            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
3264        }
3265    }
3266
3267    #[test]
3268    fn test_column_writer_check_float16_min_max() {
3269        let input = [
3270            -f16::ONE,
3271            f16::from_f32(3.0),
3272            -f16::from_f32(2.0),
3273            f16::from_f32(2.0),
3274        ]
3275        .into_iter()
3276        .map(|s| ByteArray::from(s).into())
3277        .collect::<Vec<_>>();
3278
3279        let stats = float16_statistics_roundtrip(&input);
3280        assert!(stats.is_min_max_backwards_compatible());
3281        assert_eq!(
3282            stats.min_opt().unwrap(),
3283            &ByteArray::from(-f16::from_f32(2.0))
3284        );
3285        assert_eq!(
3286            stats.max_opt().unwrap(),
3287            &ByteArray::from(f16::from_f32(3.0))
3288        );
3289    }
3290
3291    #[test]
3292    fn test_column_writer_check_float16_nan_middle() {
3293        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
3294            .into_iter()
3295            .map(|s| ByteArray::from(s).into())
3296            .collect::<Vec<_>>();
3297
3298        let stats = float16_statistics_roundtrip(&input);
3299        assert!(stats.is_min_max_backwards_compatible());
3300        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
3301        assert_eq!(
3302            stats.max_opt().unwrap(),
3303            &ByteArray::from(f16::ONE + f16::ONE)
3304        );
3305    }
3306
3307    #[test]
3308    fn test_float16_statistics_nan_middle() {
3309        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
3310            .into_iter()
3311            .map(|s| ByteArray::from(s).into())
3312            .collect::<Vec<_>>();
3313
3314        let stats = float16_statistics_roundtrip(&input);
3315        assert!(stats.is_min_max_backwards_compatible());
3316        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
3317        assert_eq!(
3318            stats.max_opt().unwrap(),
3319            &ByteArray::from(f16::ONE + f16::ONE)
3320        );
3321    }
3322
3323    #[test]
3324    fn test_float16_statistics_nan_start() {
3325        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
3326            .into_iter()
3327            .map(|s| ByteArray::from(s).into())
3328            .collect::<Vec<_>>();
3329
3330        let stats = float16_statistics_roundtrip(&input);
3331        assert!(stats.is_min_max_backwards_compatible());
3332        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
3333        assert_eq!(
3334            stats.max_opt().unwrap(),
3335            &ByteArray::from(f16::ONE + f16::ONE)
3336        );
3337    }
3338
3339    #[test]
3340    fn test_float16_statistics_nan_only() {
3341        let input = [f16::NAN, f16::NAN]
3342            .into_iter()
3343            .map(|s| ByteArray::from(s).into())
3344            .collect::<Vec<_>>();
3345
3346        let stats = float16_statistics_roundtrip(&input);
3347        assert!(stats.min_bytes_opt().is_none());
3348        assert!(stats.max_bytes_opt().is_none());
3349        assert!(stats.is_min_max_backwards_compatible());
3350    }
3351
3352    #[test]
3353    fn test_float16_statistics_zero_only() {
3354        let input = [f16::ZERO]
3355            .into_iter()
3356            .map(|s| ByteArray::from(s).into())
3357            .collect::<Vec<_>>();
3358
3359        let stats = float16_statistics_roundtrip(&input);
3360        assert!(stats.is_min_max_backwards_compatible());
3361        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
3362        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
3363    }
3364
3365    #[test]
3366    fn test_float16_statistics_neg_zero_only() {
3367        let input = [f16::NEG_ZERO]
3368            .into_iter()
3369            .map(|s| ByteArray::from(s).into())
3370            .collect::<Vec<_>>();
3371
3372        let stats = float16_statistics_roundtrip(&input);
3373        assert!(stats.is_min_max_backwards_compatible());
3374        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
3375        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
3376    }
3377
3378    #[test]
3379    fn test_float16_statistics_zero_min() {
3380        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
3381            .into_iter()
3382            .map(|s| ByteArray::from(s).into())
3383            .collect::<Vec<_>>();
3384
3385        let stats = float16_statistics_roundtrip(&input);
3386        assert!(stats.is_min_max_backwards_compatible());
3387        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
3388        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
3389    }
3390
3391    #[test]
3392    fn test_float16_statistics_neg_zero_max() {
3393        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
3394            .into_iter()
3395            .map(|s| ByteArray::from(s).into())
3396            .collect::<Vec<_>>();
3397
3398        let stats = float16_statistics_roundtrip(&input);
3399        assert!(stats.is_min_max_backwards_compatible());
3400        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
3401        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
3402    }
3403
3404    #[test]
3405    fn test_float_statistics_nan_middle() {
3406        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
3407        assert!(stats.is_min_max_backwards_compatible());
3408        if let Statistics::Float(stats) = stats {
3409            assert_eq!(stats.min_opt().unwrap(), &1.0);
3410            assert_eq!(stats.max_opt().unwrap(), &2.0);
3411        } else {
3412            panic!("expecting Statistics::Float");
3413        }
3414    }
3415
3416    #[test]
3417    fn test_float_statistics_nan_start() {
3418        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
3419        assert!(stats.is_min_max_backwards_compatible());
3420        if let Statistics::Float(stats) = stats {
3421            assert_eq!(stats.min_opt().unwrap(), &1.0);
3422            assert_eq!(stats.max_opt().unwrap(), &2.0);
3423        } else {
3424            panic!("expecting Statistics::Float");
3425        }
3426    }
3427
3428    #[test]
3429    fn test_float_statistics_nan_only() {
3430        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
3431        assert!(stats.min_bytes_opt().is_none());
3432        assert!(stats.max_bytes_opt().is_none());
3433        assert!(stats.is_min_max_backwards_compatible());
3434        assert!(matches!(stats, Statistics::Float(_)));
3435    }
3436
3437    #[test]
3438    fn test_float_statistics_zero_only() {
3439        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
3440        assert!(stats.is_min_max_backwards_compatible());
3441        if let Statistics::Float(stats) = stats {
3442            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3443            assert!(stats.min_opt().unwrap().is_sign_negative());
3444            assert_eq!(stats.max_opt().unwrap(), &0.0);
3445            assert!(stats.max_opt().unwrap().is_sign_positive());
3446        } else {
3447            panic!("expecting Statistics::Float");
3448        }
3449    }
3450
3451    #[test]
3452    fn test_float_statistics_neg_zero_only() {
3453        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
3454        assert!(stats.is_min_max_backwards_compatible());
3455        if let Statistics::Float(stats) = stats {
3456            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3457            assert!(stats.min_opt().unwrap().is_sign_negative());
3458            assert_eq!(stats.max_opt().unwrap(), &0.0);
3459            assert!(stats.max_opt().unwrap().is_sign_positive());
3460        } else {
3461            panic!("expecting Statistics::Float");
3462        }
3463    }
3464
3465    #[test]
3466    fn test_float_statistics_zero_min() {
3467        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
3468        assert!(stats.is_min_max_backwards_compatible());
3469        if let Statistics::Float(stats) = stats {
3470            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3471            assert!(stats.min_opt().unwrap().is_sign_negative());
3472            assert_eq!(stats.max_opt().unwrap(), &2.0);
3473        } else {
3474            panic!("expecting Statistics::Float");
3475        }
3476    }
3477
3478    #[test]
3479    fn test_float_statistics_neg_zero_max() {
3480        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
3481        assert!(stats.is_min_max_backwards_compatible());
3482        if let Statistics::Float(stats) = stats {
3483            assert_eq!(stats.min_opt().unwrap(), &-2.0);
3484            assert_eq!(stats.max_opt().unwrap(), &0.0);
3485            assert!(stats.max_opt().unwrap().is_sign_positive());
3486        } else {
3487            panic!("expecting Statistics::Float");
3488        }
3489    }
3490
3491    #[test]
3492    fn test_double_statistics_nan_middle() {
3493        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
3494        assert!(stats.is_min_max_backwards_compatible());
3495        if let Statistics::Double(stats) = stats {
3496            assert_eq!(stats.min_opt().unwrap(), &1.0);
3497            assert_eq!(stats.max_opt().unwrap(), &2.0);
3498        } else {
3499            panic!("expecting Statistics::Double");
3500        }
3501    }
3502
3503    #[test]
3504    fn test_double_statistics_nan_start() {
3505        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
3506        assert!(stats.is_min_max_backwards_compatible());
3507        if let Statistics::Double(stats) = stats {
3508            assert_eq!(stats.min_opt().unwrap(), &1.0);
3509            assert_eq!(stats.max_opt().unwrap(), &2.0);
3510        } else {
3511            panic!("expecting Statistics::Double");
3512        }
3513    }
3514
3515    #[test]
3516    fn test_double_statistics_nan_only() {
3517        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
3518        assert!(stats.min_bytes_opt().is_none());
3519        assert!(stats.max_bytes_opt().is_none());
3520        assert!(matches!(stats, Statistics::Double(_)));
3521        assert!(stats.is_min_max_backwards_compatible());
3522    }
3523
3524    #[test]
3525    fn test_double_statistics_zero_only() {
3526        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
3527        assert!(stats.is_min_max_backwards_compatible());
3528        if let Statistics::Double(stats) = stats {
3529            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3530            assert!(stats.min_opt().unwrap().is_sign_negative());
3531            assert_eq!(stats.max_opt().unwrap(), &0.0);
3532            assert!(stats.max_opt().unwrap().is_sign_positive());
3533        } else {
3534            panic!("expecting Statistics::Double");
3535        }
3536    }
3537
3538    #[test]
3539    fn test_double_statistics_neg_zero_only() {
3540        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
3541        assert!(stats.is_min_max_backwards_compatible());
3542        if let Statistics::Double(stats) = stats {
3543            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3544            assert!(stats.min_opt().unwrap().is_sign_negative());
3545            assert_eq!(stats.max_opt().unwrap(), &0.0);
3546            assert!(stats.max_opt().unwrap().is_sign_positive());
3547        } else {
3548            panic!("expecting Statistics::Double");
3549        }
3550    }
3551
3552    #[test]
3553    fn test_double_statistics_zero_min() {
3554        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
3555        assert!(stats.is_min_max_backwards_compatible());
3556        if let Statistics::Double(stats) = stats {
3557            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3558            assert!(stats.min_opt().unwrap().is_sign_negative());
3559            assert_eq!(stats.max_opt().unwrap(), &2.0);
3560        } else {
3561            panic!("expecting Statistics::Double");
3562        }
3563    }
3564
3565    #[test]
3566    fn test_double_statistics_neg_zero_max() {
3567        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
3568        assert!(stats.is_min_max_backwards_compatible());
3569        if let Statistics::Double(stats) = stats {
3570            assert_eq!(stats.min_opt().unwrap(), &-2.0);
3571            assert_eq!(stats.max_opt().unwrap(), &0.0);
3572            assert!(stats.max_opt().unwrap().is_sign_positive());
3573        } else {
3574            panic!("expecting Statistics::Double");
3575        }
3576    }
3577
3578    #[test]
3579    fn test_compare_greater_byte_array_decimals() {
3580        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
3581        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
3582        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
3583        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
3584        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
3585        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
3586        assert!(!compare_greater_byte_array_decimals(
3587            &[0u8, 1u8,],
3588            &[1u8, 0u8,],
3589        ),);
3590        assert!(!compare_greater_byte_array_decimals(
3591            &[255u8, 35u8, 0u8, 0u8,],
3592            &[0u8,],
3593        ),);
3594        assert!(compare_greater_byte_array_decimals(
3595            &[0u8,],
3596            &[255u8, 35u8, 0u8, 0u8,],
3597        ),);
3598    }
3599
3600    #[test]
3601    fn test_column_index_with_null_pages() {
3602        // write a single page of all nulls
3603        let page_writer = get_test_page_writer();
3604        let props = Default::default();
3605        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
3606        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
3607
3608        let r = writer.close().unwrap();
3609        assert!(r.column_index.is_some());
3610        let col_idx = r.column_index.unwrap();
3611        let col_idx = match col_idx {
3612            ColumnIndexMetaData::INT32(col_idx) => col_idx,
3613            _ => panic!("wrong stats type"),
3614        };
3615        // null_pages should be true for page 0
3616        assert!(col_idx.is_null_page(0));
3617        // min and max should be empty byte arrays
3618        assert!(col_idx.min_value(0).is_none());
3619        assert!(col_idx.max_value(0).is_none());
3620        // null_counts should be defined and be 4 for page 0
3621        assert!(col_idx.null_count(0).is_some());
3622        assert_eq!(col_idx.null_count(0), Some(4));
3623        // there is no repetition so rep histogram should be absent
3624        assert!(col_idx.repetition_level_histogram(0).is_none());
3625        // definition_level_histogram should be present and should be 0:4, 1:0
3626        assert!(col_idx.definition_level_histogram(0).is_some());
3627        assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3628    }
3629
3630    #[test]
3631    fn test_column_offset_index_metadata() {
3632        // write data
3633        // and check the offset index and column index
3634        let page_writer = get_test_page_writer();
3635        let props = Default::default();
3636        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3637        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3638        // first page
3639        writer.flush_data_pages().unwrap();
3640        // second page
3641        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3642
3643        let r = writer.close().unwrap();
3644        let column_index = r.column_index.unwrap();
3645        let offset_index = r.offset_index.unwrap();
3646
3647        assert_eq!(8, r.rows_written);
3648
3649        // column index
3650        let column_index = match column_index {
3651            ColumnIndexMetaData::INT32(column_index) => column_index,
3652            _ => panic!("wrong stats type"),
3653        };
3654        assert_eq!(2, column_index.num_pages());
3655        assert_eq!(2, offset_index.page_locations.len());
3656        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3657        for idx in 0..2 {
3658            assert!(!column_index.is_null_page(idx));
3659            assert_eq!(0, column_index.null_count(0).unwrap());
3660        }
3661
3662        if let Some(stats) = r.metadata.statistics() {
3663            assert_eq!(stats.null_count_opt(), Some(0));
3664            assert_eq!(stats.distinct_count_opt(), None);
3665            if let Statistics::Int32(stats) = stats {
3666                // first page is [1,2,3,4]
3667                // second page is [-5,2,4,8]
3668                // note that we don't increment here, as this is a non BinaryArray type.
3669                assert_eq!(stats.min_opt(), column_index.min_value(1));
3670                assert_eq!(stats.max_opt(), column_index.max_value(1));
3671            } else {
3672                panic!("expecting Statistics::Int32");
3673            }
3674        } else {
3675            panic!("metadata missing statistics");
3676        }
3677
3678        // page location
3679        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3680        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3681    }
3682
3683    /// Verify min/max value truncation in the column index works as expected
3684    #[test]
3685    fn test_column_offset_index_metadata_truncating() {
3686        // write data
3687        // and check the offset index and column index
3688        let page_writer = get_test_page_writer();
3689        let props = WriterProperties::builder()
3690            .set_statistics_truncate_length(None) // disable column index truncation
3691            .build()
3692            .into();
3693        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3694
3695        let mut data = vec![FixedLenByteArray::default(); 3];
3696        // This is the expected min value - "aaa..."
3697        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3698        // This is the expected max value - "ZZZ..."
3699        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3700        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3701
3702        writer.write_batch(&data, None, None).unwrap();
3703
3704        writer.flush_data_pages().unwrap();
3705
3706        let r = writer.close().unwrap();
3707        let column_index = r.column_index.unwrap();
3708        let offset_index = r.offset_index.unwrap();
3709
3710        let column_index = match column_index {
3711            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3712            _ => panic!("wrong stats type"),
3713        };
3714
3715        assert_eq!(3, r.rows_written);
3716
3717        // column index
3718        assert_eq!(1, column_index.num_pages());
3719        assert_eq!(1, offset_index.page_locations.len());
3720        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3721        assert!(!column_index.is_null_page(0));
3722        assert_eq!(Some(0), column_index.null_count(0));
3723
3724        if let Some(stats) = r.metadata.statistics() {
3725            assert_eq!(stats.null_count_opt(), Some(0));
3726            assert_eq!(stats.distinct_count_opt(), None);
3727            if let Statistics::FixedLenByteArray(stats) = stats {
3728                let column_index_min_value = column_index.min_value(0).unwrap();
3729                let column_index_max_value = column_index.max_value(0).unwrap();
3730
3731                // Column index stats are truncated, while the column chunk's aren't.
3732                assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3733                assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3734
3735                assert_eq!(
3736                    column_index_min_value.len(),
3737                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3738                );
3739                assert_eq!(column_index_min_value, &[97_u8; 64]);
3740                assert_eq!(
3741                    column_index_max_value.len(),
3742                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3743                );
3744
3745                // We expect the last byte to be incremented
3746                assert_eq!(
3747                    *column_index_max_value.last().unwrap(),
3748                    *column_index_max_value.first().unwrap() + 1
3749                );
3750            } else {
3751                panic!("expecting Statistics::FixedLenByteArray");
3752            }
3753        } else {
3754            panic!("metadata missing statistics");
3755        }
3756    }
3757
3758    #[test]
3759    fn test_column_offset_index_truncating_spec_example() {
3760        // write data
3761        // and check the offset index and column index
3762        let page_writer = get_test_page_writer();
3763
3764        // Truncate values at 1 byte
3765        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3766        let props = Arc::new(builder.build());
3767        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3768
3769        let mut data = vec![FixedLenByteArray::default(); 1];
3770        // This is the expected min value
3771        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3772
3773        writer.write_batch(&data, None, None).unwrap();
3774
3775        writer.flush_data_pages().unwrap();
3776
3777        let r = writer.close().unwrap();
3778        let column_index = r.column_index.unwrap();
3779        let offset_index = r.offset_index.unwrap();
3780
3781        let column_index = match column_index {
3782            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3783            _ => panic!("wrong stats type"),
3784        };
3785
3786        assert_eq!(1, r.rows_written);
3787
3788        // column index
3789        assert_eq!(1, column_index.num_pages());
3790        assert_eq!(1, offset_index.page_locations.len());
3791        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3792        assert!(!column_index.is_null_page(0));
3793        assert_eq!(Some(0), column_index.null_count(0));
3794
3795        if let Some(stats) = r.metadata.statistics() {
3796            assert_eq!(stats.null_count_opt(), Some(0));
3797            assert_eq!(stats.distinct_count_opt(), None);
3798            if let Statistics::FixedLenByteArray(_stats) = stats {
3799                let column_index_min_value = column_index.min_value(0).unwrap();
3800                let column_index_max_value = column_index.max_value(0).unwrap();
3801
3802                assert_eq!(column_index_min_value.len(), 1);
3803                assert_eq!(column_index_max_value.len(), 1);
3804
3805                assert_eq!("B".as_bytes(), column_index_min_value);
3806                assert_eq!("C".as_bytes(), column_index_max_value);
3807
3808                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3809                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3810            } else {
3811                panic!("expecting Statistics::FixedLenByteArray");
3812            }
3813        } else {
3814            panic!("metadata missing statistics");
3815        }
3816    }
3817
3818    #[test]
3819    fn test_float16_min_max_no_truncation() {
3820        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3821        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3822        let props = Arc::new(builder.build());
3823        let page_writer = get_test_page_writer();
3824        let mut writer = get_test_float16_column_writer(page_writer, props);
3825
3826        let expected_value = f16::PI.to_le_bytes().to_vec();
3827        let data = vec![ByteArray::from(expected_value.clone()).into()];
3828        writer.write_batch(&data, None, None).unwrap();
3829        writer.flush_data_pages().unwrap();
3830
3831        let r = writer.close().unwrap();
3832
3833        // stats should still be written
3834        // ensure bytes weren't truncated for column index
3835        let column_index = r.column_index.unwrap();
3836        let column_index = match column_index {
3837            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3838            _ => panic!("wrong stats type"),
3839        };
3840        let column_index_min_bytes = column_index.min_value(0).unwrap();
3841        let column_index_max_bytes = column_index.max_value(0).unwrap();
3842        assert_eq!(expected_value, column_index_min_bytes);
3843        assert_eq!(expected_value, column_index_max_bytes);
3844
3845        // ensure bytes weren't truncated for statistics
3846        let stats = r.metadata.statistics().unwrap();
3847        if let Statistics::FixedLenByteArray(stats) = stats {
3848            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3849            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3850            assert_eq!(expected_value, stats_min_bytes);
3851            assert_eq!(expected_value, stats_max_bytes);
3852        } else {
3853            panic!("expecting Statistics::FixedLenByteArray");
3854        }
3855    }
3856
3857    #[test]
3858    fn test_decimal_min_max_no_truncation() {
3859        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3860        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3861        let props = Arc::new(builder.build());
3862        let page_writer = get_test_page_writer();
3863        let mut writer =
3864            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3865
3866        let expected_value = vec![
3867            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3868            231u8, 90u8, 0u8, 0u8,
3869        ];
3870        let data = vec![ByteArray::from(expected_value.clone()).into()];
3871        writer.write_batch(&data, None, None).unwrap();
3872        writer.flush_data_pages().unwrap();
3873
3874        let r = writer.close().unwrap();
3875
3876        // stats should still be written
3877        // ensure bytes weren't truncated for column index
3878        let column_index = r.column_index.unwrap();
3879        let column_index = match column_index {
3880            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3881            _ => panic!("wrong stats type"),
3882        };
3883        let column_index_min_bytes = column_index.min_value(0).unwrap();
3884        let column_index_max_bytes = column_index.max_value(0).unwrap();
3885        assert_eq!(expected_value, column_index_min_bytes);
3886        assert_eq!(expected_value, column_index_max_bytes);
3887
3888        // ensure bytes weren't truncated for statistics
3889        let stats = r.metadata.statistics().unwrap();
3890        if let Statistics::FixedLenByteArray(stats) = stats {
3891            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3892            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3893            assert_eq!(expected_value, stats_min_bytes);
3894            assert_eq!(expected_value, stats_max_bytes);
3895        } else {
3896            panic!("expecting Statistics::FixedLenByteArray");
3897        }
3898    }
3899
3900    #[test]
3901    fn test_statistics_truncating_byte_array_default() {
3902        let page_writer = get_test_page_writer();
3903
3904        // The default truncate length is 64 bytes
3905        let props = WriterProperties::builder().build().into();
3906        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3907
3908        let mut data = vec![ByteArray::default(); 1];
3909        data[0].set_data(Bytes::from(String::from(
3910            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3911        )));
3912        writer.write_batch(&data, None, None).unwrap();
3913        writer.flush_data_pages().unwrap();
3914
3915        let r = writer.close().unwrap();
3916
3917        assert_eq!(1, r.rows_written);
3918
3919        let stats = r.metadata.statistics().expect("statistics");
3920        if let Statistics::ByteArray(_stats) = stats {
3921            let min_value = _stats.min_opt().unwrap();
3922            let max_value = _stats.max_opt().unwrap();
3923
3924            assert!(!_stats.min_is_exact());
3925            assert!(!_stats.max_is_exact());
3926
3927            let expected_len = 64;
3928            assert_eq!(min_value.len(), expected_len);
3929            assert_eq!(max_value.len(), expected_len);
3930
3931            let expected_min =
3932                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3933            assert_eq!(expected_min, min_value.as_bytes());
3934            // note the max value is different from the min value: the last byte is incremented
3935            let expected_max =
3936                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3937            assert_eq!(expected_max, max_value.as_bytes());
3938        } else {
3939            panic!("expecting Statistics::ByteArray");
3940        }
3941    }
3942
3943    #[test]
3944    fn test_statistics_truncating_byte_array() {
3945        let page_writer = get_test_page_writer();
3946
3947        const TEST_TRUNCATE_LENGTH: usize = 1;
3948
3949        // Truncate values at 1 byte
3950        let builder =
3951            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3952        let props = Arc::new(builder.build());
3953        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3954
3955        let mut data = vec![ByteArray::default(); 1];
3956        // This is the expected min value
3957        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3958
3959        writer.write_batch(&data, None, None).unwrap();
3960
3961        writer.flush_data_pages().unwrap();
3962
3963        let r = writer.close().unwrap();
3964
3965        assert_eq!(1, r.rows_written);
3966
3967        let stats = r.metadata.statistics().expect("statistics");
3968        assert_eq!(stats.null_count_opt(), Some(0));
3969        assert_eq!(stats.distinct_count_opt(), None);
3970        if let Statistics::ByteArray(_stats) = stats {
3971            let min_value = _stats.min_opt().unwrap();
3972            let max_value = _stats.max_opt().unwrap();
3973
3974            assert!(!_stats.min_is_exact());
3975            assert!(!_stats.max_is_exact());
3976
3977            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3978            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3979
3980            assert_eq!("B".as_bytes(), min_value.as_bytes());
3981            assert_eq!("C".as_bytes(), max_value.as_bytes());
3982        } else {
3983            panic!("expecting Statistics::ByteArray");
3984        }
3985    }
3986
3987    #[test]
3988    fn test_statistics_truncating_fixed_len_byte_array() {
3989        let page_writer = get_test_page_writer();
3990
3991        const TEST_TRUNCATE_LENGTH: usize = 1;
3992
3993        // Truncate values at 1 byte
3994        let builder =
3995            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3996        let props = Arc::new(builder.build());
3997        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3998
3999        let mut data = vec![FixedLenByteArray::default(); 1];
4000
4001        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
4002        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
4003
4004        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
4005        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
4006            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
4007
4008        // This is the expected min value
4009        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
4010
4011        writer.write_batch(&data, None, None).unwrap();
4012
4013        writer.flush_data_pages().unwrap();
4014
4015        let r = writer.close().unwrap();
4016
4017        assert_eq!(1, r.rows_written);
4018
4019        let stats = r.metadata.statistics().expect("statistics");
4020        assert_eq!(stats.null_count_opt(), Some(0));
4021        assert_eq!(stats.distinct_count_opt(), None);
4022        if let Statistics::FixedLenByteArray(_stats) = stats {
4023            let min_value = _stats.min_opt().unwrap();
4024            let max_value = _stats.max_opt().unwrap();
4025
4026            assert!(!_stats.min_is_exact());
4027            assert!(!_stats.max_is_exact());
4028
4029            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
4030            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
4031
4032            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
4033            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
4034
4035            let reconstructed_min = i128::from_be_bytes([
4036                min_value.as_bytes()[0],
4037                0,
4038                0,
4039                0,
4040                0,
4041                0,
4042                0,
4043                0,
4044                0,
4045                0,
4046                0,
4047                0,
4048                0,
4049                0,
4050                0,
4051                0,
4052            ]);
4053
4054            let reconstructed_max = i128::from_be_bytes([
4055                max_value.as_bytes()[0],
4056                0,
4057                0,
4058                0,
4059                0,
4060                0,
4061                0,
4062                0,
4063                0,
4064                0,
4065                0,
4066                0,
4067                0,
4068                0,
4069                0,
4070                0,
4071            ]);
4072
4073            // check that the inner value is correctly bounded by the min/max
4074            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
4075            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
4076            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
4077            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
4078        } else {
4079            panic!("expecting Statistics::FixedLenByteArray");
4080        }
4081    }
4082
4083    #[test]
4084    fn test_send() {
4085        fn test<T: Send>() {}
4086        test::<ColumnWriterImpl<Int32Type>>();
4087    }
4088
4089    #[test]
4090    fn test_increment() {
4091        let v = increment(vec![0, 0, 0]).unwrap();
4092        assert_eq!(&v, &[0, 0, 1]);
4093
4094        // Handle overflow
4095        let v = increment(vec![0, 255, 255]).unwrap();
4096        assert_eq!(&v, &[1, 0, 0]);
4097
4098        // Return `None` if all bytes are u8::MAX
4099        let v = increment(vec![255, 255, 255]);
4100        assert!(v.is_none());
4101    }
4102
4103    #[test]
4104    fn test_increment_utf8() {
4105        let test_inc = |o: &str, expected: &str| {
4106            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
4107                // Got the expected result...
4108                assert_eq!(v, expected);
4109                // and it's greater than the original string
4110                assert!(*v > *o);
4111                // Also show that BinaryArray level comparison works here
4112                let mut greater = ByteArray::new();
4113                greater.set_data(Bytes::from(v));
4114                let mut original = ByteArray::new();
4115                original.set_data(Bytes::from(o.as_bytes().to_vec()));
4116                assert!(greater > original);
4117            } else {
4118                panic!("Expected incremented UTF8 string to also be valid.");
4119            }
4120        };
4121
4122        // Basic ASCII case
4123        test_inc("hello", "hellp");
4124
4125        // 1-byte ending in max 1-byte
4126        test_inc("a\u{7f}", "b");
4127
4128        // 1-byte max should not truncate as it would need 2-byte code points
4129        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
4130
4131        // UTF8 string
4132        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
4133
4134        // 2-byte without overflow
4135        test_inc("éééé", "éééê");
4136
4137        // 2-byte that overflows lowest byte
4138        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
4139
4140        // 2-byte ending in max 2-byte
4141        test_inc("a\u{7ff}", "b");
4142
4143        // Max 2-byte should not truncate as it would need 3-byte code points
4144        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
4145
4146        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
4147        // characters should render right to left).
4148        test_inc("ࠀࠀ", "ࠀࠁ");
4149
4150        // 3-byte ending in max 3-byte
4151        test_inc("a\u{ffff}", "b");
4152
4153        // Max 3-byte should not truncate as it would need 4-byte code points
4154        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
4155
4156        // 4-byte without overflow
4157        test_inc("𐀀𐀀", "𐀀𐀁");
4158
4159        // 4-byte ending in max unicode
4160        test_inc("a\u{10ffff}", "b");
4161
4162        // Max 4-byte should not truncate
4163        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
4164
4165        // Skip over surrogate pair range (0xD800..=0xDFFF)
4166        //test_inc("a\u{D7FF}", "a\u{e000}");
4167        test_inc("a\u{D7FF}", "b");
4168    }
4169
4170    #[test]
4171    fn test_truncate_utf8() {
4172        // No-op
4173        let data = "❤️🧡💛💚💙💜";
4174        let r = truncate_utf8(data, data.len()).unwrap();
4175        assert_eq!(r.len(), data.len());
4176        assert_eq!(&r, data.as_bytes());
4177
4178        // We slice it away from the UTF8 boundary
4179        let r = truncate_utf8(data, 13).unwrap();
4180        assert_eq!(r.len(), 10);
4181        assert_eq!(&r, "❤️🧡".as_bytes());
4182
4183        // One multi-byte code point, and a length shorter than it, so we can't slice it
4184        let r = truncate_utf8("\u{0836}", 1);
4185        assert!(r.is_none());
4186
4187        // Test truncate and increment for max bounds on UTF-8 statistics
4188        // 7-bit (i.e. ASCII)
4189        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
4190        assert_eq!(&r, "yyyyyyyz".as_bytes());
4191
4192        // 2-byte without overflow
4193        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
4194        assert_eq!(&r, "ééê".as_bytes());
4195
4196        // 2-byte that overflows lowest byte
4197        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
4198        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
4199
4200        // max 2-byte should not truncate as it would need 3-byte code points
4201        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
4202        assert!(r.is_none());
4203
4204        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
4205        // characters should render right to left).
4206        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
4207        assert_eq!(&r, "ࠀࠁ".as_bytes());
4208
4209        // max 3-byte should not truncate as it would need 4-byte code points
4210        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
4211        assert!(r.is_none());
4212
4213        // 4-byte without overflow
4214        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
4215        assert_eq!(&r, "𐀀𐀁".as_bytes());
4216
4217        // max 4-byte should not truncate
4218        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
4219        assert!(r.is_none());
4220    }
4221
4222    #[test]
4223    // Check fallback truncation of statistics that should be UTF-8, but aren't
4224    // (see https://github.com/apache/arrow-rs/pull/6870).
4225    fn test_byte_array_truncate_invalid_utf8_statistics() {
4226        let message_type = "
4227            message test_schema {
4228                OPTIONAL BYTE_ARRAY a (UTF8);
4229            }
4230        ";
4231        let schema = Arc::new(parse_message_type(message_type).unwrap());
4232
4233        // Create Vec<ByteArray> containing non-UTF8 bytes
4234        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
4235        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
4236        let file: File = tempfile::tempfile().unwrap();
4237        let props = Arc::new(
4238            WriterProperties::builder()
4239                .set_statistics_enabled(EnabledStatistics::Chunk)
4240                .set_statistics_truncate_length(Some(8))
4241                .build(),
4242        );
4243
4244        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
4245        let mut row_group_writer = writer.next_row_group().unwrap();
4246
4247        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4248        col_writer
4249            .typed::<ByteArrayType>()
4250            .write_batch(&data, Some(&def_levels), None)
4251            .unwrap();
4252        col_writer.close().unwrap();
4253        row_group_writer.close().unwrap();
4254        let file_metadata = writer.close().unwrap();
4255        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
4256        assert!(!stats.max_is_exact());
4257        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
4258        // be incremented by 1.
4259        assert_eq!(
4260            stats.max_bytes_opt().map(|v| v.to_vec()),
4261            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
4262        );
4263    }
4264
4265    #[test]
4266    fn test_increment_max_binary_chars() {
4267        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
4268        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
4269
4270        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
4271        assert!(incremented.is_none())
4272    }
4273
4274    #[test]
4275    fn test_no_column_index_when_stats_disabled() {
4276        // https://github.com/apache/arrow-rs/issues/6010
4277        // Test that column index is not created/written for all-nulls column when page
4278        // statistics are disabled.
4279        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4280        let props = Arc::new(
4281            WriterProperties::builder()
4282                .set_statistics_enabled(EnabledStatistics::None)
4283                .build(),
4284        );
4285        let column_writer = get_column_writer(descr, props, get_test_page_writer());
4286        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
4287
4288        let data = Vec::new();
4289        let def_levels = vec![0; 10];
4290        writer.write_batch(&data, Some(&def_levels), None).unwrap();
4291        writer.flush_data_pages().unwrap();
4292
4293        let column_close_result = writer.close().unwrap();
4294        assert!(column_close_result.offset_index.is_some());
4295        assert!(column_close_result.column_index.is_none());
4296    }
4297
4298    #[test]
4299    fn test_no_offset_index_when_disabled() {
4300        // Test that offset indexes can be disabled
4301        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4302        let props = Arc::new(
4303            WriterProperties::builder()
4304                .set_statistics_enabled(EnabledStatistics::None)
4305                .set_offset_index_disabled(true)
4306                .build(),
4307        );
4308        let column_writer = get_column_writer(descr, props, get_test_page_writer());
4309        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
4310
4311        let data = Vec::new();
4312        let def_levels = vec![0; 10];
4313        writer.write_batch(&data, Some(&def_levels), None).unwrap();
4314        writer.flush_data_pages().unwrap();
4315
4316        let column_close_result = writer.close().unwrap();
4317        assert!(column_close_result.offset_index.is_none());
4318        assert!(column_close_result.column_index.is_none());
4319    }
4320
4321    #[test]
4322    fn test_offset_index_overridden() {
4323        // Test that offset indexes are not disabled when gathering page statistics
4324        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4325        let props = Arc::new(
4326            WriterProperties::builder()
4327                .set_statistics_enabled(EnabledStatistics::Page)
4328                .set_offset_index_disabled(true)
4329                .build(),
4330        );
4331        let column_writer = get_column_writer(descr, props, get_test_page_writer());
4332        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
4333
4334        let data = Vec::new();
4335        let def_levels = vec![0; 10];
4336        writer.write_batch(&data, Some(&def_levels), None).unwrap();
4337        writer.flush_data_pages().unwrap();
4338
4339        let column_close_result = writer.close().unwrap();
4340        assert!(column_close_result.offset_index.is_some());
4341        assert!(column_close_result.column_index.is_some());
4342    }
4343
4344    #[test]
4345    fn test_boundary_order() -> Result<()> {
4346        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4347        // min max both ascending
4348        let column_close_result = write_multiple_pages::<Int32Type>(
4349            &descr,
4350            &[
4351                &[Some(-10), Some(10)],
4352                &[Some(-5), Some(11)],
4353                &[None],
4354                &[Some(-5), Some(11)],
4355            ],
4356        )?;
4357        let boundary_order = column_close_result
4358            .column_index
4359            .unwrap()
4360            .get_boundary_order();
4361        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4362
4363        // min max both descending
4364        let column_close_result = write_multiple_pages::<Int32Type>(
4365            &descr,
4366            &[
4367                &[Some(10), Some(11)],
4368                &[Some(5), Some(11)],
4369                &[None],
4370                &[Some(-5), Some(0)],
4371            ],
4372        )?;
4373        let boundary_order = column_close_result
4374            .column_index
4375            .unwrap()
4376            .get_boundary_order();
4377        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
4378
4379        // min max both equal
4380        let column_close_result = write_multiple_pages::<Int32Type>(
4381            &descr,
4382            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
4383        )?;
4384        let boundary_order = column_close_result
4385            .column_index
4386            .unwrap()
4387            .get_boundary_order();
4388        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4389
4390        // only nulls
4391        let column_close_result =
4392            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
4393        let boundary_order = column_close_result
4394            .column_index
4395            .unwrap()
4396            .get_boundary_order();
4397        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4398
4399        // one page
4400        let column_close_result =
4401            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
4402        let boundary_order = column_close_result
4403            .column_index
4404            .unwrap()
4405            .get_boundary_order();
4406        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4407
4408        // one non-null page
4409        let column_close_result =
4410            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
4411        let boundary_order = column_close_result
4412            .column_index
4413            .unwrap()
4414            .get_boundary_order();
4415        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4416
4417        // min max both unordered
4418        let column_close_result = write_multiple_pages::<Int32Type>(
4419            &descr,
4420            &[
4421                &[Some(10), Some(11)],
4422                &[Some(11), Some(16)],
4423                &[None],
4424                &[Some(-5), Some(0)],
4425            ],
4426        )?;
4427        let boundary_order = column_close_result
4428            .column_index
4429            .unwrap()
4430            .get_boundary_order();
4431        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4432
4433        // min max both ordered in different orders
4434        let column_close_result = write_multiple_pages::<Int32Type>(
4435            &descr,
4436            &[
4437                &[Some(1), Some(9)],
4438                &[Some(2), Some(8)],
4439                &[None],
4440                &[Some(3), Some(7)],
4441            ],
4442        )?;
4443        let boundary_order = column_close_result
4444            .column_index
4445            .unwrap()
4446            .get_boundary_order();
4447        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4448
4449        Ok(())
4450    }
4451
4452    #[test]
4453    fn test_boundary_order_logical_type() -> Result<()> {
4454        // ensure that logical types account for different sort order than underlying
4455        // physical type representation
4456        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
4457        let fba_descr = {
4458            let tpe = SchemaType::primitive_type_builder(
4459                "col",
4460                FixedLenByteArrayType::get_physical_type(),
4461            )
4462            .with_length(2)
4463            .build()?;
4464            Arc::new(ColumnDescriptor::new(
4465                Arc::new(tpe),
4466                1,
4467                0,
4468                ColumnPath::from("col"),
4469            ))
4470        };
4471
4472        let values: &[&[Option<FixedLenByteArray>]] = &[
4473            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
4474            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
4475            &[Some(FixedLenByteArray::from(ByteArray::from(
4476                f16::NEG_ZERO,
4477            )))],
4478            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
4479        ];
4480
4481        // f16 descending
4482        let column_close_result =
4483            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
4484        let boundary_order = column_close_result
4485            .column_index
4486            .unwrap()
4487            .get_boundary_order();
4488        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
4489
4490        // same bytes, but fba unordered
4491        let column_close_result =
4492            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
4493        let boundary_order = column_close_result
4494            .column_index
4495            .unwrap()
4496            .get_boundary_order();
4497        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4498
4499        Ok(())
4500    }
4501
4502    #[test]
4503    fn test_interval_stats_should_not_have_min_max() {
4504        let input = [
4505            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
4506            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
4507            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
4508        ]
4509        .into_iter()
4510        .map(|s| ByteArray::from(s).into())
4511        .collect::<Vec<_>>();
4512
4513        let page_writer = get_test_page_writer();
4514        let mut writer = get_test_interval_column_writer(page_writer);
4515        writer.write_batch(&input, None, None).unwrap();
4516
4517        let metadata = writer.close().unwrap().metadata;
4518        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4519            stats.clone()
4520        } else {
4521            panic!("metadata missing statistics");
4522        };
4523        assert!(stats.min_bytes_opt().is_none());
4524        assert!(stats.max_bytes_opt().is_none());
4525    }
4526
4527    #[test]
4528    #[cfg(feature = "arrow")]
4529    fn test_column_writer_get_estimated_total_bytes() {
4530        let page_writer = get_test_page_writer();
4531        let props = Default::default();
4532        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
4533        assert_eq!(writer.get_estimated_total_bytes(), 0);
4534
4535        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
4536        writer.add_data_page().unwrap();
4537        let size_with_one_page = writer.get_estimated_total_bytes();
4538        assert_eq!(size_with_one_page, 20);
4539
4540        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
4541        writer.add_data_page().unwrap();
4542        let size_with_two_pages = writer.get_estimated_total_bytes();
4543        // different pages have different compressed lengths
4544        assert_eq!(size_with_two_pages, 20 + 21);
4545    }
4546
4547    fn write_multiple_pages<T: DataType>(
4548        column_descr: &Arc<ColumnDescriptor>,
4549        pages: &[&[Option<T::T>]],
4550    ) -> Result<ColumnCloseResult> {
4551        let column_writer = get_column_writer(
4552            column_descr.clone(),
4553            Default::default(),
4554            get_test_page_writer(),
4555        );
4556        let mut writer = get_typed_column_writer::<T>(column_writer);
4557
4558        for &page in pages {
4559            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
4560            let def_levels = page
4561                .iter()
4562                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
4563                .collect::<Vec<_>>();
4564            writer.write_batch(&values, Some(&def_levels), None)?;
4565            writer.flush_data_pages()?;
4566        }
4567
4568        writer.close()
4569    }
4570
4571    /// Performs write-read roundtrip with randomly generated values and levels.
4572    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
4573    /// for a column.
4574    fn column_roundtrip_random<T: DataType>(
4575        props: WriterProperties,
4576        max_size: usize,
4577        min_value: T::T,
4578        max_value: T::T,
4579        max_def_level: i16,
4580        max_rep_level: i16,
4581    ) where
4582        T::T: PartialOrd + SampleUniform + Copy,
4583    {
4584        let mut num_values: usize = 0;
4585
4586        let mut buf: Vec<i16> = Vec::new();
4587        let def_levels = if max_def_level > 0 {
4588            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
4589            for &dl in &buf[..] {
4590                if dl == max_def_level {
4591                    num_values += 1;
4592                }
4593            }
4594            Some(&buf[..])
4595        } else {
4596            num_values = max_size;
4597            None
4598        };
4599
4600        let mut buf: Vec<i16> = Vec::new();
4601        let rep_levels = if max_rep_level > 0 {
4602            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
4603            buf[0] = 0; // Must start on record boundary
4604            Some(&buf[..])
4605        } else {
4606            None
4607        };
4608
4609        let mut values: Vec<T::T> = Vec::new();
4610        random_numbers_range(num_values, min_value, max_value, &mut values);
4611
4612        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
4613    }
4614
4615    /// Performs write-read roundtrip and asserts written values and levels.
4616    fn column_roundtrip<T: DataType>(
4617        props: WriterProperties,
4618        values: &[T::T],
4619        def_levels: Option<&[i16]>,
4620        rep_levels: Option<&[i16]>,
4621    ) {
4622        let mut file = tempfile::tempfile().unwrap();
4623        let mut write = TrackedWrite::new(&mut file);
4624        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4625
4626        let max_def_level = match def_levels {
4627            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4628            None => 0i16,
4629        };
4630
4631        let max_rep_level = match rep_levels {
4632            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4633            None => 0i16,
4634        };
4635
4636        let mut max_batch_size = values.len();
4637        if let Some(levels) = def_levels {
4638            max_batch_size = max_batch_size.max(levels.len());
4639        }
4640        if let Some(levels) = rep_levels {
4641            max_batch_size = max_batch_size.max(levels.len());
4642        }
4643
4644        let mut writer =
4645            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4646
4647        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4648        assert_eq!(values_written, values.len());
4649        let result = writer.close().unwrap();
4650
4651        drop(write);
4652
4653        let props = ReaderProperties::builder()
4654            .set_backward_compatible_lz4(false)
4655            .build();
4656        let page_reader = Box::new(
4657            SerializedPageReader::new_with_properties(
4658                Arc::new(file),
4659                &result.metadata,
4660                result.rows_written as usize,
4661                None,
4662                Arc::new(props),
4663            )
4664            .unwrap(),
4665        );
4666        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4667
4668        let mut actual_values = Vec::with_capacity(max_batch_size);
4669        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4670        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4671
4672        let (_, values_read, levels_read) = reader
4673            .read_records(
4674                max_batch_size,
4675                actual_def_levels.as_mut(),
4676                actual_rep_levels.as_mut(),
4677                &mut actual_values,
4678            )
4679            .unwrap();
4680
4681        // Assert values, definition and repetition levels.
4682
4683        assert_eq!(&actual_values[..values_read], values);
4684        match actual_def_levels {
4685            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4686            None => assert_eq!(None, def_levels),
4687        }
4688        match actual_rep_levels {
4689            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4690            None => assert_eq!(None, rep_levels),
4691        }
4692
4693        // Assert written rows.
4694
4695        if let Some(levels) = actual_rep_levels {
4696            let mut actual_rows_written = 0;
4697            for l in levels {
4698                if l == 0 {
4699                    actual_rows_written += 1;
4700                }
4701            }
4702            assert_eq!(actual_rows_written, result.rows_written);
4703        } else if actual_def_levels.is_some() {
4704            assert_eq!(levels_read as u64, result.rows_written);
4705        } else {
4706            assert_eq!(values_read as u64, result.rows_written);
4707        }
4708    }
4709
4710    /// Performs write of provided values and returns column metadata of those values.
4711    /// Used to test encoding support for column writer.
4712    fn column_write_and_get_metadata<T: DataType>(
4713        props: WriterProperties,
4714        values: &[T::T],
4715    ) -> ColumnChunkMetaData {
4716        let page_writer = get_test_page_writer();
4717        let props = Arc::new(props);
4718        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4719        writer.write_batch(values, None, None).unwrap();
4720        writer.close().unwrap().metadata
4721    }
4722
4723    // Helper function to more compactly create a PageEncodingStats struct.
4724    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4725        PageEncodingStats {
4726            page_type,
4727            encoding,
4728            count,
4729        }
4730    }
4731
4732    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4733    // offset and encodings to make sure that column writer uses provided by trait
4734    // encodings.
4735    fn check_encoding_write_support<T: DataType>(
4736        version: WriterVersion,
4737        dict_enabled: bool,
4738        data: &[T::T],
4739        dictionary_page_offset: Option<i64>,
4740        encodings: &[Encoding],
4741        page_encoding_stats: &[PageEncodingStats],
4742    ) {
4743        let props = WriterProperties::builder()
4744            .set_writer_version(version)
4745            .set_dictionary_enabled(dict_enabled)
4746            .build();
4747        let meta = column_write_and_get_metadata::<T>(props, data);
4748        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4749        assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4750        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4751    }
4752
4753    /// Returns column writer.
4754    fn get_test_column_writer<'a, T: DataType>(
4755        page_writer: Box<dyn PageWriter + 'a>,
4756        max_def_level: i16,
4757        max_rep_level: i16,
4758        props: WriterPropertiesPtr,
4759    ) -> ColumnWriterImpl<'a, T> {
4760        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4761        let column_writer = get_column_writer(descr, props, page_writer);
4762        get_typed_column_writer::<T>(column_writer)
4763    }
4764
4765    fn get_test_column_writer_with_path<'a, T: DataType>(
4766        page_writer: Box<dyn PageWriter + 'a>,
4767        max_def_level: i16,
4768        max_rep_level: i16,
4769        props: WriterPropertiesPtr,
4770        path: ColumnPath,
4771    ) -> ColumnWriterImpl<'a, T> {
4772        let descr = Arc::new(get_test_column_descr_with_path::<T>(
4773            max_def_level,
4774            max_rep_level,
4775            path,
4776        ));
4777        let column_writer = get_column_writer(descr, props, page_writer);
4778        get_typed_column_writer::<T>(column_writer)
4779    }
4780
4781    /// Pages collected by [`write_and_collect_pages`].
4782    struct CollectedPages {
4783        /// `(compressed byte size, value count)` for every data page, in order.
4784        data_pages: Vec<(usize, u32)>,
4785        /// Largest dictionary page seen, or 0 if the column wasn't dict-encoded.
4786        dict_page_size: usize,
4787    }
4788
4789    /// Writes `data` (with optional def/rep levels) through a raw
4790    /// `ColumnWriterImpl` configured by `props`, then re-reads the file and
4791    /// returns its page layout. Shared by the page-size regression tests so
4792    /// each only has to express its props, input, and assertions.
4793    fn write_and_collect_pages<T: DataType>(
4794        props: WriterProperties,
4795        max_def_level: i16,
4796        max_rep_level: i16,
4797        data: &[T::T],
4798        def_levels: Option<&[i16]>,
4799        rep_levels: Option<&[i16]>,
4800    ) -> CollectedPages {
4801        let mut file = tempfile::tempfile().unwrap();
4802        let mut write = TrackedWrite::new(&mut file);
4803        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4804        let mut writer =
4805            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4806        writer.write_batch(data, def_levels, rep_levels).unwrap();
4807        let r = writer.close().unwrap();
4808        drop(write);
4809
4810        let read_props = ReaderProperties::builder()
4811            .set_backward_compatible_lz4(false)
4812            .build();
4813        let mut page_reader = Box::new(
4814            SerializedPageReader::new_with_properties(
4815                Arc::new(file),
4816                &r.metadata,
4817                r.rows_written as usize,
4818                None,
4819                Arc::new(read_props),
4820            )
4821            .unwrap(),
4822        );
4823
4824        let mut collected = CollectedPages {
4825            data_pages: Vec::new(),
4826            dict_page_size: 0,
4827        };
4828        while let Some(page) = page_reader.get_next_page().unwrap() {
4829            match page.page_type() {
4830                PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
4831                    collected
4832                        .data_pages
4833                        .push((page.buffer().len(), page.num_values()));
4834                }
4835                PageType::DICTIONARY_PAGE => {
4836                    collected.dict_page_size = collected.dict_page_size.max(page.buffer().len());
4837                }
4838                _ => {}
4839            }
4840        }
4841        collected
4842    }
4843
4844    /// Returns column reader.
4845    fn get_test_column_reader<T: DataType>(
4846        page_reader: Box<dyn PageReader>,
4847        max_def_level: i16,
4848        max_rep_level: i16,
4849    ) -> ColumnReaderImpl<T> {
4850        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4851        let column_reader = get_column_reader(descr, page_reader);
4852        get_typed_column_reader::<T>(column_reader)
4853    }
4854
4855    /// Returns descriptor for primitive column.
4856    fn get_test_column_descr<T: DataType>(
4857        max_def_level: i16,
4858        max_rep_level: i16,
4859    ) -> ColumnDescriptor {
4860        let path = ColumnPath::from("col");
4861        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4862            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4863            // it should be no-op for other types
4864            .with_length(1)
4865            .build()
4866            .unwrap();
4867        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4868    }
4869
4870    fn get_test_column_descr_with_path<T: DataType>(
4871        max_def_level: i16,
4872        max_rep_level: i16,
4873        path: ColumnPath,
4874    ) -> ColumnDescriptor {
4875        let name = path.string();
4876        let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4877            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4878            // it should be no-op for other types
4879            .with_length(1)
4880            .build()
4881            .unwrap();
4882        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4883    }
4884
4885    fn write_and_collect_page_values(
4886        path: ColumnPath,
4887        props: WriterPropertiesPtr,
4888        data: &[i32],
4889    ) -> Vec<u32> {
4890        let mut file = tempfile::tempfile().unwrap();
4891        let mut write = TrackedWrite::new(&mut file);
4892        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4893        let mut writer =
4894            get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4895        writer.write_batch(data, None, None).unwrap();
4896        let r = writer.close().unwrap();
4897
4898        drop(write);
4899
4900        let props = ReaderProperties::builder()
4901            .set_backward_compatible_lz4(false)
4902            .build();
4903        let mut page_reader = Box::new(
4904            SerializedPageReader::new_with_properties(
4905                Arc::new(file),
4906                &r.metadata,
4907                r.rows_written as usize,
4908                None,
4909                Arc::new(props),
4910            )
4911            .unwrap(),
4912        );
4913
4914        let mut values_per_page = Vec::new();
4915        while let Some(page) = page_reader.get_next_page().unwrap() {
4916            assert_eq!(page.page_type(), PageType::DATA_PAGE);
4917            values_per_page.push(page.num_values());
4918        }
4919
4920        values_per_page
4921    }
4922
4923    /// Returns page writer that collects pages without serializing them.
4924    fn get_test_page_writer() -> Box<dyn PageWriter> {
4925        Box::new(TestPageWriter {})
4926    }
4927
4928    struct TestPageWriter {}
4929
4930    impl PageWriter for TestPageWriter {
4931        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4932            let mut res = PageWriteSpec::new();
4933            res.page_type = page.page_type();
4934            res.uncompressed_size = page.uncompressed_size();
4935            res.compressed_size = page.compressed_size();
4936            res.num_values = page.num_values();
4937            res.offset = 0;
4938            res.bytes_written = page.data().len() as u64;
4939            Ok(res)
4940        }
4941
4942        fn close(&mut self) -> Result<()> {
4943            Ok(())
4944        }
4945    }
4946
4947    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4948    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4949        let page_writer = get_test_page_writer();
4950        let props = Default::default();
4951        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4952        writer.write_batch(values, None, None).unwrap();
4953
4954        let metadata = writer.close().unwrap().metadata;
4955        if let Some(stats) = metadata.statistics() {
4956            stats.clone()
4957        } else {
4958            panic!("metadata missing statistics");
4959        }
4960    }
4961
4962    /// Returns Decimals column writer.
4963    fn get_test_decimals_column_writer<T: DataType>(
4964        page_writer: Box<dyn PageWriter>,
4965        max_def_level: i16,
4966        max_rep_level: i16,
4967        props: WriterPropertiesPtr,
4968    ) -> ColumnWriterImpl<'static, T> {
4969        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4970            max_def_level,
4971            max_rep_level,
4972        ));
4973        let column_writer = get_column_writer(descr, props, page_writer);
4974        get_typed_column_writer::<T>(column_writer)
4975    }
4976
4977    /// Returns descriptor for Decimal type with primitive column.
4978    fn get_test_decimals_column_descr<T: DataType>(
4979        max_def_level: i16,
4980        max_rep_level: i16,
4981    ) -> ColumnDescriptor {
4982        let path = ColumnPath::from("col");
4983        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4984            .with_length(16)
4985            .with_logical_type(Some(LogicalType::decimal(2, 3)))
4986            .with_scale(2)
4987            .with_precision(3)
4988            .build()
4989            .unwrap();
4990        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4991    }
4992
4993    fn float16_statistics_roundtrip(
4994        values: &[FixedLenByteArray],
4995    ) -> ValueStatistics<FixedLenByteArray> {
4996        let page_writer = get_test_page_writer();
4997        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4998        writer.write_batch(values, None, None).unwrap();
4999
5000        let metadata = writer.close().unwrap().metadata;
5001        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
5002            stats.clone()
5003        } else {
5004            panic!("metadata missing statistics");
5005        }
5006    }
5007
5008    fn get_test_float16_column_writer(
5009        page_writer: Box<dyn PageWriter>,
5010        props: WriterPropertiesPtr,
5011    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
5012        let descr = Arc::new(get_test_float16_column_descr(0, 0));
5013        let column_writer = get_column_writer(descr, props, page_writer);
5014        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
5015    }
5016
5017    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
5018        let path = ColumnPath::from("col");
5019        let tpe =
5020            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
5021                .with_length(2)
5022                .with_logical_type(Some(LogicalType::Float16))
5023                .build()
5024                .unwrap();
5025        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
5026    }
5027
5028    fn get_test_interval_column_writer(
5029        page_writer: Box<dyn PageWriter>,
5030    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
5031        let descr = Arc::new(get_test_interval_column_descr());
5032        let column_writer = get_column_writer(descr, Default::default(), page_writer);
5033        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
5034    }
5035
5036    fn get_test_interval_column_descr() -> ColumnDescriptor {
5037        let path = ColumnPath::from("col");
5038        let tpe =
5039            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
5040                .with_length(12)
5041                .with_converted_type(ConvertedType::INTERVAL)
5042                .build()
5043                .unwrap();
5044        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
5045    }
5046
5047    /// Returns column writer for UINT32 Column provided as ConvertedType only
5048    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
5049        page_writer: Box<dyn PageWriter + 'a>,
5050        max_def_level: i16,
5051        max_rep_level: i16,
5052        props: WriterPropertiesPtr,
5053    ) -> ColumnWriterImpl<'a, T> {
5054        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
5055            max_def_level,
5056            max_rep_level,
5057        ));
5058        let column_writer = get_column_writer(descr, props, page_writer);
5059        get_typed_column_writer::<T>(column_writer)
5060    }
5061
5062    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
5063    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
5064        max_def_level: i16,
5065        max_rep_level: i16,
5066    ) -> ColumnDescriptor {
5067        let path = ColumnPath::from("col");
5068        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
5069            .with_converted_type(ConvertedType::UINT_32)
5070            .build()
5071            .unwrap();
5072        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
5073    }
5074
5075    #[test]
5076    fn test_page_v2_snappy_compression_fallback() {
5077        // Test that PageV2 sets is_compressed to false when Snappy compression increases data size
5078        let page_writer = TestPageWriter {};
5079
5080        // Create WriterProperties with PageV2 and Snappy compression
5081        let props = WriterProperties::builder()
5082            .set_writer_version(WriterVersion::PARQUET_2_0)
5083            // Disable dictionary to ensure data is written directly
5084            .set_dictionary_enabled(false)
5085            .set_compression(Compression::SNAPPY)
5086            .build();
5087
5088        let mut column_writer =
5089            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
5090
5091        // Create small, simple data that Snappy compression will likely increase in size
5092        // due to compression overhead for very small data
5093        let values = vec![ByteArray::from("a")];
5094
5095        column_writer.write_batch(&values, None, None).unwrap();
5096
5097        let result = column_writer.close().unwrap();
5098        assert_eq!(
5099            result.metadata.uncompressed_size(),
5100            result.metadata.compressed_size()
5101        );
5102    }
5103
5104    struct ColumnRoundTripUniform<'a, T: DataType> {
5105        props: WriterProperties,
5106        values: &'a [T::T],
5107        def_levels: LevelDataRef<'a>,
5108        rep_levels: LevelDataRef<'a>,
5109        max_def_level: i16,
5110        max_rep_level: i16,
5111        expected_values: &'a [T::T],
5112        expected_def_levels: Option<&'a [i16]>,
5113        expected_rep_levels: Option<&'a [i16]>,
5114    }
5115
5116    impl<'a, T: DataType> ColumnRoundTripUniform<'a, T>
5117    where
5118        T::T: PartialEq + std::fmt::Debug,
5119    {
5120        fn new() -> Self {
5121            Self {
5122                props: Default::default(),
5123                values: &[],
5124                def_levels: LevelDataRef::Absent,
5125                rep_levels: LevelDataRef::Absent,
5126                max_def_level: 0,
5127                max_rep_level: 0,
5128                expected_values: &[],
5129                expected_def_levels: None,
5130                expected_rep_levels: None,
5131            }
5132        }
5133
5134        fn with_props(mut self, props: WriterProperties) -> Self {
5135            self.props = props;
5136            self
5137        }
5138
5139        fn with_values(mut self, values: &'a [T::T]) -> Self {
5140            self.values = values;
5141            self
5142        }
5143
5144        fn with_def_levels(mut self, def_levels: LevelDataRef<'a>) -> Self {
5145            self.def_levels = def_levels;
5146            self
5147        }
5148
5149        fn with_rep_levels(mut self, rep_levels: LevelDataRef<'a>) -> Self {
5150            self.rep_levels = rep_levels;
5151            self
5152        }
5153
5154        fn with_max_def_level(mut self, max_def_level: i16) -> Self {
5155            self.max_def_level = max_def_level;
5156            self
5157        }
5158
5159        fn with_max_rep_level(mut self, max_rep_level: i16) -> Self {
5160            self.max_rep_level = max_rep_level;
5161            self
5162        }
5163
5164        fn with_expected_values(mut self, expected_values: &'a [T::T]) -> Self {
5165            self.expected_values = expected_values;
5166            self
5167        }
5168
5169        fn with_expected_def_levels(mut self, expected_def_levels: &'a [i16]) -> Self {
5170            self.expected_def_levels = Some(expected_def_levels);
5171            self
5172        }
5173
5174        fn with_expected_rep_levels(mut self, expected_rep_levels: &'a [i16]) -> Self {
5175            self.expected_rep_levels = Some(expected_rep_levels);
5176            self
5177        }
5178
5179        /// Write-then-read roundtrip using `write_batch_internal` with the given
5180        /// [`LevelDataRef`] variants, and assert the read-back matches `expected_*`.
5181        fn run(self) {
5182            let mut file = tempfile::tempfile().unwrap();
5183            let mut write = TrackedWrite::new(&mut file);
5184            let page_writer = Box::new(SerializedPageWriter::new(&mut write));
5185            let mut writer = get_test_column_writer::<T>(
5186                page_writer,
5187                self.max_def_level,
5188                self.max_rep_level,
5189                Arc::new(self.props),
5190            );
5191
5192            writer
5193                .write_batch_internal(
5194                    self.values,
5195                    None,
5196                    self.def_levels,
5197                    self.rep_levels,
5198                    None,
5199                    None,
5200                    None,
5201                )
5202                .unwrap();
5203            let result = writer.close().unwrap();
5204            drop(write);
5205
5206            let props = ReaderProperties::builder()
5207                .set_backward_compatible_lz4(false)
5208                .build();
5209            let page_reader = Box::new(
5210                SerializedPageReader::new_with_properties(
5211                    Arc::new(file),
5212                    &result.metadata,
5213                    result.rows_written as usize,
5214                    None,
5215                    Arc::new(props),
5216                )
5217                .unwrap(),
5218            );
5219            let mut reader =
5220                get_test_column_reader::<T>(page_reader, self.max_def_level, self.max_rep_level);
5221
5222            let batch_size = self
5223                .expected_def_levels
5224                .map_or(self.expected_values.len(), |l| l.len());
5225            let mut actual_values = Vec::with_capacity(batch_size);
5226            let mut actual_def = self
5227                .expected_def_levels
5228                .map(|_| Vec::with_capacity(batch_size));
5229            let mut actual_rep = self
5230                .expected_rep_levels
5231                .map(|_| Vec::with_capacity(batch_size));
5232
5233            let (_, values_read, levels_read) = reader
5234                .read_records(
5235                    batch_size,
5236                    actual_def.as_mut(),
5237                    actual_rep.as_mut(),
5238                    &mut actual_values,
5239                )
5240                .unwrap();
5241
5242            assert_eq!(&actual_values[..values_read], self.expected_values);
5243            if let Some(ref v) = actual_def {
5244                assert_eq!(&v[..levels_read], self.expected_def_levels.unwrap());
5245            }
5246            if let Some(ref v) = actual_rep {
5247                assert_eq!(&v[..levels_read], self.expected_rep_levels.unwrap());
5248            }
5249        }
5250    }
5251
5252    #[test]
5253    fn test_level_data_ref_value_count() {
5254        // `value_count` is what the byte-budget chunker uses to convert a
5255        // chunk's level span into a leaf-value count. It must work for any
5256        // column shape — flat, nullable, or nested — because the leaf
5257        // values array is decoupled from the rep/def level stream.
5258        let max_def = 2;
5259        // Non-nullable / unrepeated: no def levels materialized — every
5260        // level is a value.
5261        assert_eq!(LevelDataRef::Absent.value_count(64, max_def), 64);
5262        // Uniform run of present values, and of nulls.
5263        assert_eq!(
5264            LevelDataRef::Uniform {
5265                value: max_def,
5266                count: 40
5267            }
5268            .value_count(40, max_def),
5269            40
5270        );
5271        assert_eq!(
5272            LevelDataRef::Uniform {
5273                value: max_def - 1,
5274                count: 40
5275            }
5276            .value_count(40, max_def),
5277            0
5278        );
5279        // Materialized def levels (nullable / nested): only levels equal to
5280        // `max_def` are values; empty-list / null levels are not.
5281        let levels = [2i16, 0, 2, 1, 2, 2, 0];
5282        assert_eq!(
5283            LevelDataRef::Materialized(&levels).value_count(levels.len(), max_def),
5284            4
5285        );
5286    }
5287
5288    #[test]
5289    fn test_uniform_def_levels_all_null() {
5290        // All-null column: def_level=0 (null) for every slot, no values written.
5291        let max_def_level = 1;
5292        let count = 100;
5293        let expected_def_levels = vec![0i16; count];
5294        ColumnRoundTripUniform::<Int32Type>::new()
5295            .with_def_levels(LevelDataRef::Uniform { value: 0, count })
5296            .with_max_def_level(max_def_level)
5297            .with_expected_def_levels(&expected_def_levels)
5298            .run();
5299    }
5300
5301    #[test]
5302    fn test_uniform_def_levels_all_valid() {
5303        // All-valid column: def_level=max for every slot, all values written.
5304        let max_def_level = 1;
5305        let values: Vec<i32> = (0..50).collect();
5306        let expected_def_levels = vec![max_def_level; values.len()];
5307        ColumnRoundTripUniform::<Int32Type>::new()
5308            .with_values(&values)
5309            .with_def_levels(LevelDataRef::Uniform {
5310                value: max_def_level,
5311                count: values.len(),
5312            })
5313            .with_max_def_level(max_def_level)
5314            .with_expected_values(&values)
5315            .with_expected_def_levels(&expected_def_levels)
5316            .run();
5317    }
5318
5319    #[test]
5320    fn test_uniform_def_and_rep_levels() {
5321        // Simulates a list column where every row is null:
5322        // def=0, rep=0 for each row (one row = one entry with no child values).
5323        let max_def_level = 2;
5324        let max_rep_level = 1;
5325        let count = 200;
5326        let expected_def_levels = vec![0i16; count];
5327        let expected_rep_levels = vec![0i16; count];
5328        ColumnRoundTripUniform::<Int32Type>::new()
5329            .with_def_levels(LevelDataRef::Uniform { value: 0, count })
5330            .with_rep_levels(LevelDataRef::Uniform { value: 0, count })
5331            .with_max_def_level(max_def_level)
5332            .with_max_rep_level(max_rep_level)
5333            .with_expected_def_levels(&expected_def_levels)
5334            .with_expected_rep_levels(&expected_rep_levels)
5335            .run();
5336    }
5337
5338    #[test]
5339    fn test_uniform_levels_v1_and_v2() {
5340        // Verify uniform levels work identically for both Parquet writer versions.
5341        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
5342            let props = WriterProperties::builder()
5343                .set_writer_version(version)
5344                .build();
5345            let max_def = 1;
5346            let count = 100;
5347            let expected_def_levels = vec![0i16; count];
5348            ColumnRoundTripUniform::<Int32Type>::new()
5349                .with_props(props)
5350                .with_def_levels(LevelDataRef::Uniform { value: 0, count })
5351                .with_max_def_level(max_def)
5352                .with_expected_def_levels(&expected_def_levels)
5353                .run();
5354        }
5355    }
5356}