Skip to main content

parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30    BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43    OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54    ($e:expr, $i:ident, $b:expr) => {
55        match $e {
56            Self::BoolColumnWriter($i) => $b,
57            Self::Int32ColumnWriter($i) => $b,
58            Self::Int64ColumnWriter($i) => $b,
59            Self::Int96ColumnWriter($i) => $b,
60            Self::FloatColumnWriter($i) => $b,
61            Self::DoubleColumnWriter($i) => $b,
62            Self::ByteArrayColumnWriter($i) => $b,
63            Self::FixedLenByteArrayColumnWriter($i) => $b,
64        }
65    };
66}
67
68/// Column writer for a Parquet type.
69///
70/// See [`get_column_writer`] to create instances of this type
71pub enum ColumnWriter<'a> {
72    /// Column writer for boolean type
73    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74    /// Column writer for int32 type
75    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76    /// Column writer for int64 type
77    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78    /// Column writer for int96 (timestamp) type
79    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80    /// Column writer for float type
81    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82    /// Column writer for double type
83    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84    /// Column writer for byte array type
85    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86    /// Column writer for fixed length byte array type
87    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91    /// Returns the estimated total memory usage
92    #[cfg(feature = "arrow")]
93    pub(crate) fn memory_size(&self) -> usize {
94        downcast_writer!(self, typed, typed.memory_size())
95    }
96
97    /// Returns the estimated total encoded bytes for this column writer
98    #[cfg(feature = "arrow")]
99    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101    }
102
103    /// Finalize the currently buffered values as a data page.
104    ///
105    /// This is used by content-defined chunking to force a page boundary at
106    /// content-determined positions.
107    #[cfg(feature = "arrow")]
108    pub(crate) fn add_data_page(&mut self) -> Result<()> {
109        downcast_writer!(self, typed, typed.add_data_page())
110    }
111
112    /// Close this [`ColumnWriter`], returning the metadata for the column chunk.
113    pub fn close(self) -> Result<ColumnCloseResult> {
114        downcast_writer!(self, typed, typed.close())
115    }
116}
117
118/// Create a specific column writer corresponding to column descriptor `descr`.
119pub fn get_column_writer<'a>(
120    descr: ColumnDescPtr,
121    props: WriterPropertiesPtr,
122    page_writer: Box<dyn PageWriter + 'a>,
123) -> ColumnWriter<'a> {
124    match descr.physical_type() {
125        Type::BOOLEAN => {
126            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::INT32 => {
129            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::INT64 => {
132            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::INT96 => {
135            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::FLOAT => {
138            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139        }
140        Type::DOUBLE => {
141            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142        }
143        Type::BYTE_ARRAY => {
144            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
145        }
146        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
147            ColumnWriterImpl::new(descr, props, page_writer),
148        ),
149    }
150}
151
152/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
153/// non-generic type to a generic column writer type `ColumnWriterImpl`.
154///
155/// Panics if actual enum value for `col_writer` does not match the type `T`.
156pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
157    T::get_column_writer(col_writer).unwrap_or_else(|| {
158        panic!(
159            "Failed to convert column writer into a typed column writer for `{}` type",
160            T::get_physical_type()
161        )
162    })
163}
164
165/// Similar to `get_typed_column_writer` but returns a reference.
166pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
167    col_writer: &'b ColumnWriter<'a>,
168) -> &'b ColumnWriterImpl<'a, T> {
169    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
170        panic!(
171            "Failed to convert column writer into a typed column writer for `{}` type",
172            T::get_physical_type()
173        )
174    })
175}
176
177/// Similar to `get_typed_column_writer` but returns a reference.
178pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
179    col_writer: &'a mut ColumnWriter<'b>,
180) -> &'a mut ColumnWriterImpl<'b, T> {
181    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
182        panic!(
183            "Failed to convert column writer into a typed column writer for `{}` type",
184            T::get_physical_type()
185        )
186    })
187}
188
189/// Metadata for a column chunk of a Parquet file.
190///
191/// Note this structure is returned by [`ColumnWriter::close`].
192#[derive(Debug, Clone)]
193pub struct ColumnCloseResult {
194    /// The total number of bytes written
195    pub bytes_written: u64,
196    /// The total number of rows written
197    pub rows_written: u64,
198    /// Metadata for this column chunk
199    pub metadata: ColumnChunkMetaData,
200    /// Optional bloom filter for this column
201    pub bloom_filter: Option<Sbbf>,
202    /// Optional column index, for filtering
203    pub column_index: Option<ColumnIndexMetaData>,
204    /// Optional offset index, identifying page locations
205    pub offset_index: Option<OffsetIndexMetaData>,
206}
207
208// Metrics per page
209#[derive(Default)]
210struct PageMetrics {
211    num_buffered_values: u32,
212    num_buffered_rows: u32,
213    num_page_nulls: u64,
214    repetition_level_histogram: Option<LevelHistogram>,
215    definition_level_histogram: Option<LevelHistogram>,
216}
217
218impl PageMetrics {
219    fn new() -> Self {
220        Default::default()
221    }
222
223    /// Initialize the repetition level histogram
224    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
225        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
226        self
227    }
228
229    /// Initialize the definition level histogram
230    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
231        self.definition_level_histogram = LevelHistogram::try_new(max_level);
232        self
233    }
234
235    /// Resets the state of this `PageMetrics` to the initial state.
236    /// If histograms have been initialized their contents will be reset to zero.
237    fn new_page(&mut self) {
238        self.num_buffered_values = 0;
239        self.num_buffered_rows = 0;
240        self.num_page_nulls = 0;
241        self.repetition_level_histogram
242            .as_mut()
243            .map(LevelHistogram::reset);
244        self.definition_level_histogram
245            .as_mut()
246            .map(LevelHistogram::reset);
247    }
248}
249
250// Metrics per column writer
251#[derive(Default)]
252struct ColumnMetrics<T: Default> {
253    total_bytes_written: u64,
254    total_rows_written: u64,
255    total_uncompressed_size: u64,
256    total_compressed_size: u64,
257    total_num_values: u64,
258    dictionary_page_offset: Option<u64>,
259    data_page_offset: Option<u64>,
260    min_column_value: Option<T>,
261    max_column_value: Option<T>,
262    num_column_nulls: u64,
263    column_distinct_count: Option<u64>,
264    variable_length_bytes: Option<i64>,
265    repetition_level_histogram: Option<LevelHistogram>,
266    definition_level_histogram: Option<LevelHistogram>,
267}
268
269impl<T: Default> ColumnMetrics<T> {
270    fn new() -> Self {
271        Default::default()
272    }
273
274    /// Initialize the repetition level histogram
275    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
276        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
277        self
278    }
279
280    /// Initialize the definition level histogram
281    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
282        self.definition_level_histogram = LevelHistogram::try_new(max_level);
283        self
284    }
285
286    /// Sum `page_histogram` into `chunk_histogram`
287    fn update_histogram(
288        chunk_histogram: &mut Option<LevelHistogram>,
289        page_histogram: &Option<LevelHistogram>,
290    ) {
291        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
292            chunk_hist.add(page_hist);
293        }
294    }
295
296    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
297    /// page histograms are not initialized.
298    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
299        ColumnMetrics::<T>::update_histogram(
300            &mut self.definition_level_histogram,
301            &page_metrics.definition_level_histogram,
302        );
303        ColumnMetrics::<T>::update_histogram(
304            &mut self.repetition_level_histogram,
305            &page_metrics.repetition_level_histogram,
306        );
307    }
308
309    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
310    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
311        if let Some(var_bytes) = variable_length_bytes {
312            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
313        }
314    }
315}
316
317/// Borrowed view of level data, analogous to `&str` for `LevelData`'s `String`.
318///
319/// `LevelDataRef` can be constructed from `LevelData` and directly from an existing
320/// `&[i16]` without allocating.
321///
322/// The variants are different physical representations of the same logical
323/// sequence of levels.
324#[derive(Debug, Clone, Copy)]
325pub(crate) enum LevelDataRef<'a> {
326    Absent,
327    Materialized(&'a [i16]),
328    Uniform { value: i16, count: usize },
329}
330
331impl<'a> From<&'a [i16]> for LevelDataRef<'a> {
332    fn from(levels: &'a [i16]) -> Self {
333        Self::Materialized(levels)
334    }
335}
336
337impl<'a> From<Option<&'a [i16]>> for LevelDataRef<'a> {
338    fn from(levels: Option<&'a [i16]>) -> Self {
339        levels.map_or(Self::Absent, Self::from)
340    }
341}
342
343impl<'a> LevelDataRef<'a> {
344    pub(crate) fn len(self) -> usize {
345        match self {
346            Self::Absent => 0,
347            Self::Materialized(values) => values.len(),
348            Self::Uniform { count, .. } => count,
349        }
350    }
351
352    pub(crate) fn first(self) -> Option<i16> {
353        match self {
354            Self::Absent => None,
355            Self::Materialized(values) => values.first().copied(),
356            Self::Uniform { value, count } => (count > 0).then_some(value),
357        }
358    }
359
360    #[cfg(feature = "arrow")]
361    pub(crate) fn value_at(self, idx: usize) -> Option<i16> {
362        match self {
363            Self::Absent => None,
364            Self::Materialized(values) => values.get(idx).copied(),
365            Self::Uniform { value, count } => (idx < count).then_some(value),
366        }
367    }
368
369    pub(crate) fn slice(self, offset: usize, len: usize) -> Self {
370        match self {
371            Self::Absent => Self::Absent,
372            Self::Materialized(values) => Self::Materialized(&values[offset..offset + len]),
373            Self::Uniform { value, .. } => Self::Uniform { value, count: len },
374        }
375    }
376}
377
378/// Typed column writer for a primitive column.
379pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
380
381/// Generic column writer for a primitive Parquet column
382pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
383    // Column writer properties
384    descr: ColumnDescPtr,
385    props: WriterPropertiesPtr,
386    statistics_enabled: EnabledStatistics,
387
388    page_writer: Box<dyn PageWriter + 'a>,
389    codec: Compression,
390    compressor: Option<Box<dyn Codec>>,
391    encoder: E,
392
393    page_metrics: PageMetrics,
394    // Metrics per column writer
395    column_metrics: ColumnMetrics<E::T>,
396
397    /// The order of encodings within the generated metadata does not impact its meaning,
398    /// but we use a BTreeSet so that the output is deterministic
399    encodings: BTreeSet<Encoding>,
400    encoding_stats: Vec<PageEncodingStats>,
401    // Streaming level encoders for definition/repetition levels.
402    def_levels_encoder: LevelEncoder,
403    rep_levels_encoder: LevelEncoder,
404    data_pages: VecDeque<CompressedPage>,
405    // column index and offset index
406    column_index_builder: ColumnIndexBuilder,
407    offset_index_builder: Option<OffsetIndexBuilder>,
408
409    // Below fields used to incrementally check boundary order across data pages.
410    // We assume they are ascending/descending until proven wrong.
411    data_page_boundary_ascending: bool,
412    data_page_boundary_descending: bool,
413    /// (min, max)
414    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
415}
416
417impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
418    /// Returns a new instance of [`GenericColumnWriter`].
419    pub fn new(
420        descr: ColumnDescPtr,
421        props: WriterPropertiesPtr,
422        page_writer: Box<dyn PageWriter + 'a>,
423    ) -> Self {
424        let codec = props.compression(descr.path());
425        let codec_options = CodecOptionsBuilder::default().build();
426        let compressor = create_codec(codec, &codec_options).unwrap();
427        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
428
429        let statistics_enabled = props.statistics_enabled(descr.path());
430
431        let mut encodings = BTreeSet::new();
432        // Used for level information
433        encodings.insert(Encoding::RLE);
434
435        let mut page_metrics = PageMetrics::new();
436        let mut column_metrics = ColumnMetrics::<E::T>::new();
437
438        // Initialize level histograms if collecting page or chunk statistics
439        if statistics_enabled != EnabledStatistics::None {
440            page_metrics = page_metrics
441                .with_repetition_level_histogram(descr.max_rep_level())
442                .with_definition_level_histogram(descr.max_def_level());
443            column_metrics = column_metrics
444                .with_repetition_level_histogram(descr.max_rep_level())
445                .with_definition_level_histogram(descr.max_def_level())
446        }
447
448        // Disable column_index_builder if not collecting page statistics.
449        let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
450        if statistics_enabled != EnabledStatistics::Page {
451            column_index_builder.to_invalid()
452        }
453
454        // Disable offset_index_builder if requested by user.
455        let offset_index_builder = match props.offset_index_disabled() {
456            false => Some(OffsetIndexBuilder::new()),
457            _ => None,
458        };
459
460        Self {
461            def_levels_encoder: Self::create_level_encoder(descr.max_def_level(), &props),
462            rep_levels_encoder: Self::create_level_encoder(descr.max_rep_level(), &props),
463            descr,
464            props,
465            statistics_enabled,
466            page_writer,
467            codec,
468            compressor,
469            encoder,
470            data_pages: VecDeque::new(),
471            page_metrics,
472            column_metrics,
473            column_index_builder,
474            offset_index_builder,
475            encodings,
476            encoding_stats: vec![],
477            data_page_boundary_ascending: true,
478            data_page_boundary_descending: true,
479            last_non_null_data_page_min_max: None,
480        }
481    }
482
483    #[allow(clippy::too_many_arguments)]
484    pub(crate) fn write_batch_internal(
485        &mut self,
486        values: &E::Values,
487        value_indices: Option<&[usize]>,
488        def_levels: LevelDataRef<'_>,
489        rep_levels: LevelDataRef<'_>,
490        min: Option<&E::T>,
491        max: Option<&E::T>,
492        distinct_count: Option<u64>,
493    ) -> Result<usize> {
494        // Check if number of definition levels is the same as number of repetition levels.
495        if def_levels.len() != 0 && rep_levels.len() != 0 && def_levels.len() != rep_levels.len() {
496            return Err(general_err!(
497                "Inconsistent length of definition and repetition levels: {} != {}",
498                def_levels.len(),
499                rep_levels.len()
500            ));
501        }
502
503        // We check for DataPage limits only after we have inserted the values. If a user
504        // writes a large number of values, the DataPage size can be well above the limit.
505        //
506        // The purpose of this chunking is to bound this. Even if a user writes large
507        // number of values, the chunking will ensure that we add data page at a
508        // reasonable pagesize limit.
509
510        // TODO: find out why we don't account for size of levels when we estimate page
511        // size.
512        let num_levels = def_levels.len().max(rep_levels.len());
513        let num_levels = if num_levels > 0 {
514            num_levels
515        } else {
516            value_indices.map_or(values.len(), |i| i.len())
517        };
518
519        if let Some(min) = min {
520            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
521        }
522        if let Some(max) = max {
523            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
524        }
525
526        // We can only set the distinct count if there are no other writes
527        if self.encoder.num_values() == 0 {
528            self.column_metrics.column_distinct_count = distinct_count;
529        } else {
530            self.column_metrics.column_distinct_count = None;
531        }
532
533        let mut values_offset = 0;
534        let mut levels_offset = 0;
535        let both_levels_compact = !matches!(def_levels, LevelDataRef::Materialized(_))
536            && !matches!(rep_levels, LevelDataRef::Materialized(_));
537        let has_levels = !matches!(def_levels, LevelDataRef::Absent)
538            || !matches!(rep_levels, LevelDataRef::Absent);
539        // When both level vectors are compact (Uniform or Absent), there is no
540        // materialized slice to split and the per-mini-batch work is O(1), so we
541        // can safely use a much larger batch size.
542        let base_batch_size = if both_levels_compact && has_levels {
543            self.props.data_page_row_count_limit()
544        } else {
545            self.props.write_batch_size()
546        };
547        while levels_offset < num_levels {
548            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
549
550            // Split at record boundary
551            if let LevelDataRef::Materialized(levels) = rep_levels {
552                while end_offset < levels.len() && levels[end_offset] != 0 {
553                    end_offset += 1;
554                }
555            }
556
557            values_offset += self.write_mini_batch(
558                values,
559                values_offset,
560                value_indices,
561                end_offset - levels_offset,
562                def_levels.slice(levels_offset, end_offset - levels_offset),
563                rep_levels.slice(levels_offset, end_offset - levels_offset),
564            )?;
565            levels_offset = end_offset;
566        }
567
568        // Return total number of values processed.
569        Ok(values_offset)
570    }
571
572    /// Writes batch of values, definition levels and repetition levels.
573    /// Returns number of values processed (written).
574    ///
575    /// If definition and repetition levels are provided, we write fully those levels and
576    /// select how many values to write (this number will be returned), since number of
577    /// actual written values may be smaller than provided values.
578    ///
579    /// If only values are provided, then all values are written and the length of
580    /// of the values buffer is returned.
581    ///
582    /// Definition and/or repetition levels can be omitted, if values are
583    /// non-nullable and/or non-repeated.
584    pub fn write_batch(
585        &mut self,
586        values: &E::Values,
587        def_levels: Option<&[i16]>,
588        rep_levels: Option<&[i16]>,
589    ) -> Result<usize> {
590        self.write_batch_internal(
591            values,
592            None,
593            LevelDataRef::from(def_levels),
594            LevelDataRef::from(rep_levels),
595            None,
596            None,
597            None,
598        )
599    }
600
601    /// Writer may optionally provide pre-calculated statistics for use when computing
602    /// chunk-level statistics
603    ///
604    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
605    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
606    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
607    /// computed page statistics
608    pub fn write_batch_with_statistics(
609        &mut self,
610        values: &E::Values,
611        def_levels: Option<&[i16]>,
612        rep_levels: Option<&[i16]>,
613        min: Option<&E::T>,
614        max: Option<&E::T>,
615        distinct_count: Option<u64>,
616    ) -> Result<usize> {
617        self.write_batch_internal(
618            values,
619            None,
620            LevelDataRef::from(def_levels),
621            LevelDataRef::from(rep_levels),
622            min,
623            max,
624            distinct_count,
625        )
626    }
627
628    /// Returns the estimated total memory usage.
629    ///
630    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
631    /// of the current memory usage and not the final anticipated encoded size.
632    #[cfg(feature = "arrow")]
633    pub(crate) fn memory_size(&self) -> usize {
634        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
635    }
636
637    /// Returns total number of bytes written by this column writer so far.
638    /// This value is also returned when column writer is closed.
639    ///
640    /// Note: this value does not include any buffered data that has not
641    /// yet been flushed to a page.
642    pub fn get_total_bytes_written(&self) -> u64 {
643        self.column_metrics.total_bytes_written
644    }
645
646    /// Returns the estimated total encoded bytes for this column writer.
647    ///
648    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
649    /// of any data that has not yet been flushed to a page, based on it's
650    /// anticipated encoded size.
651    #[cfg(feature = "arrow")]
652    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
653        self.data_pages
654            .iter()
655            .map(|page| page.data().len() as u64)
656            .sum::<u64>()
657            + self.column_metrics.total_bytes_written
658            + self.encoder.estimated_data_page_size() as u64
659            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
660    }
661
662    /// Returns total number of rows written by this column writer so far.
663    /// This value is also returned when column writer is closed.
664    pub fn get_total_rows_written(&self) -> u64 {
665        self.column_metrics.total_rows_written
666    }
667
668    /// Returns a reference to a [`ColumnDescPtr`]
669    pub fn get_descriptor(&self) -> &ColumnDescPtr {
670        &self.descr
671    }
672
673    /// Finalizes writes and closes the column writer.
674    /// Returns total bytes written, total rows written and column chunk metadata.
675    pub fn close(mut self) -> Result<ColumnCloseResult> {
676        if self.page_metrics.num_buffered_values > 0 {
677            self.add_data_page()?;
678        }
679        if self.encoder.has_dictionary() {
680            self.write_dictionary_page()?;
681        }
682        self.flush_data_pages()?;
683        let metadata = self.build_column_metadata()?;
684        self.page_writer.close()?;
685
686        let boundary_order = match (
687            self.data_page_boundary_ascending,
688            self.data_page_boundary_descending,
689        ) {
690            // If the lists are composed of equal elements then will be marked as ascending
691            // (Also the case if all pages are null pages)
692            (true, _) => BoundaryOrder::ASCENDING,
693            (false, true) => BoundaryOrder::DESCENDING,
694            (false, false) => BoundaryOrder::UNORDERED,
695        };
696        self.column_index_builder.set_boundary_order(boundary_order);
697
698        let column_index = match self.column_index_builder.valid() {
699            true => Some(self.column_index_builder.build()?),
700            false => None,
701        };
702
703        let offset_index = self.offset_index_builder.map(|b| b.build());
704
705        Ok(ColumnCloseResult {
706            bytes_written: self.column_metrics.total_bytes_written,
707            rows_written: self.column_metrics.total_rows_written,
708            bloom_filter: self.encoder.flush_bloom_filter(),
709            metadata,
710            column_index,
711            offset_index,
712        })
713    }
714
715    /// Creates a new streaming level encoder appropriate for the writer version.
716    fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
717        match props.writer_version() {
718            WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
719            WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
720        }
721    }
722
723    /// Writes mini batch of values, definition and repetition levels.
724    /// This allows fine-grained processing of values and maintaining a reasonable
725    /// page size.
726    fn write_mini_batch(
727        &mut self,
728        values: &E::Values,
729        values_offset: usize,
730        value_indices: Option<&[usize]>,
731        num_levels: usize,
732        def_levels: LevelDataRef<'_>,
733        rep_levels: LevelDataRef<'_>,
734    ) -> Result<usize> {
735        // Process definition levels and determine how many values to write.
736        let values_to_write = if self.descr.max_def_level() > 0 {
737            let max_def = self.descr.max_def_level();
738            match def_levels {
739                LevelDataRef::Absent => {
740                    return Err(general_err!(
741                        "Definition levels are required, because max definition level = {}",
742                        self.descr.max_def_level()
743                    ));
744                }
745                LevelDataRef::Materialized(levels) => {
746                    // General path for caller-provided or already-materialized
747                    // level buffers.
748                    let mut values_to_write = 0usize;
749                    let encoder = &mut self.def_levels_encoder;
750                    match self.page_metrics.definition_level_histogram.as_mut() {
751                        Some(histogram) => encoder.put_with_observer(levels, |level, count| {
752                            values_to_write += count * (level == max_def) as usize;
753                            histogram.increment_by(level, count as i64);
754                        }),
755                        None => encoder.put_with_observer(levels, |level, count| {
756                            values_to_write += count * (level == max_def) as usize;
757                        }),
758                    };
759                    self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
760                    values_to_write
761                }
762                LevelDataRef::Uniform { value, count } => {
763                    // Fast path for all-null, all-valid, or otherwise uniform
764                    // definition levels without materializing a level buffer.
765                    let encoder = &mut self.def_levels_encoder;
766                    match self.page_metrics.definition_level_histogram.as_mut() {
767                        Some(histogram) => {
768                            encoder.put_n_with_observer(value, count, |level, run_len| {
769                                histogram.increment_by(level, run_len as i64);
770                            })
771                        }
772                        None => encoder.put_n_with_observer(value, count, |_, _| {}),
773                    };
774                    let values_to_write = count * (value == max_def) as usize;
775                    self.page_metrics.num_page_nulls += (count - values_to_write) as u64;
776                    values_to_write
777                }
778            }
779        } else {
780            num_levels
781        };
782
783        // Process repetition levels and determine how many rows we are about to process.
784        if self.descr.max_rep_level() > 0 {
785            // A row could contain more than one value.
786            let first_level = rep_levels.first().ok_or_else(|| {
787                general_err!(
788                    "Repetition levels are required, because max repetition level = {}",
789                    self.descr.max_rep_level()
790                )
791            })?;
792
793            if first_level != 0 {
794                return Err(general_err!(
795                    "Write must start at a record boundary, got non-zero repetition level of {}",
796                    first_level
797                ));
798            }
799
800            let mut new_rows = 0u32;
801            match rep_levels {
802                LevelDataRef::Absent => unreachable!(),
803                LevelDataRef::Materialized(levels) => {
804                    let encoder = &mut self.rep_levels_encoder;
805                    match self.page_metrics.repetition_level_histogram.as_mut() {
806                        Some(histogram) => encoder.put_with_observer(levels, |level, count| {
807                            new_rows += (count as u32) * (level == 0) as u32;
808                            histogram.increment_by(level, count as i64);
809                        }),
810                        None => encoder.put_with_observer(levels, |level, count| {
811                            new_rows += (count as u32) * (level == 0) as u32;
812                        }),
813                    };
814                }
815                LevelDataRef::Uniform { value, count } => {
816                    let encoder = &mut self.rep_levels_encoder;
817                    match self.page_metrics.repetition_level_histogram.as_mut() {
818                        Some(histogram) => {
819                            encoder.put_n_with_observer(value, count, |level, run_len| {
820                                new_rows += (run_len as u32) * (level == 0) as u32;
821                                histogram.increment_by(level, run_len as i64);
822                            })
823                        }
824                        None => encoder.put_n_with_observer(value, count, |level, run_len| {
825                            new_rows += (run_len as u32) * (level == 0) as u32;
826                        }),
827                    };
828                }
829            }
830            self.page_metrics.num_buffered_rows += new_rows;
831        } else {
832            // Each value is exactly one row.
833            // Equals to the number of values, we count nulls as well.
834            self.page_metrics.num_buffered_rows += num_levels as u32;
835        }
836
837        match value_indices {
838            Some(indices) => {
839                let indices = &indices[values_offset..values_offset + values_to_write];
840                self.encoder.write_gather(values, indices)?;
841            }
842            None => self.encoder.write(values, values_offset, values_to_write)?,
843        }
844
845        self.page_metrics.num_buffered_values += num_levels as u32;
846
847        if self.should_add_data_page() {
848            self.add_data_page()?;
849        }
850
851        if self.should_dict_fallback() {
852            self.dict_fallback()?;
853        }
854
855        Ok(values_to_write)
856    }
857
858    /// Returns true if we need to fall back to non-dictionary encoding.
859    ///
860    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
861    /// size.
862    #[inline]
863    fn should_dict_fallback(&self) -> bool {
864        match self.encoder.estimated_dict_page_size() {
865            Some(size) => {
866                size >= self
867                    .props
868                    .column_dictionary_page_size_limit(self.descr.path())
869            }
870            None => false,
871        }
872    }
873
874    /// Returns true if there is enough data for a data page, false otherwise.
875    #[inline]
876    fn should_add_data_page(&self) -> bool {
877        // This is necessary in the event of a much larger dictionary size than page size
878        //
879        // In such a scenario the dictionary decoder may return an estimated encoded
880        // size in excess of the page size limit, even when there are no buffered values
881        if self.page_metrics.num_buffered_values == 0 {
882            return false;
883        }
884
885        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
886            || self.encoder.estimated_data_page_size()
887                >= self.props.column_data_page_size_limit(self.descr.path())
888    }
889
890    /// Performs dictionary fallback.
891    /// Prepares and writes dictionary and all data pages into page writer.
892    fn dict_fallback(&mut self) -> Result<()> {
893        // At this point we know that we need to fall back.
894        if self.page_metrics.num_buffered_values > 0 {
895            self.add_data_page()?;
896        }
897        self.write_dictionary_page()?;
898        self.flush_data_pages()?;
899        Ok(())
900    }
901
902    /// Update the column index and offset index when adding the data page
903    fn update_column_offset_index(
904        &mut self,
905        page_statistics: Option<&ValueStatistics<E::T>>,
906        page_variable_length_bytes: Option<i64>,
907    ) {
908        // update the column index
909        let null_page =
910            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
911        // a page contains only null values,
912        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
913        if null_page && self.column_index_builder.valid() {
914            self.column_index_builder.append(
915                null_page,
916                vec![],
917                vec![],
918                self.page_metrics.num_page_nulls as i64,
919            );
920        } else if self.column_index_builder.valid() {
921            // from page statistics
922            // If can't get the page statistics, ignore this column/offset index for this column chunk
923            match &page_statistics {
924                None => {
925                    self.column_index_builder.to_invalid();
926                }
927                Some(stat) => {
928                    // Check if min/max are still ascending/descending across pages
929                    let new_min = stat.min_opt().unwrap();
930                    let new_max = stat.max_opt().unwrap();
931                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
932                        if self.data_page_boundary_ascending {
933                            // If last min/max are greater than new min/max then not ascending anymore
934                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
935                                || compare_greater(&self.descr, last_max, new_max);
936                            if not_ascending {
937                                self.data_page_boundary_ascending = false;
938                            }
939                        }
940
941                        if self.data_page_boundary_descending {
942                            // If new min/max are greater than last min/max then not descending anymore
943                            let not_descending = compare_greater(&self.descr, new_min, last_min)
944                                || compare_greater(&self.descr, new_max, last_max);
945                            if not_descending {
946                                self.data_page_boundary_descending = false;
947                            }
948                        }
949                    }
950                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
951
952                    if self.can_truncate_value() {
953                        self.column_index_builder.append(
954                            null_page,
955                            self.truncate_min_value(
956                                self.props.column_index_truncate_length(),
957                                stat.min_bytes_opt().unwrap(),
958                            )
959                            .0,
960                            self.truncate_max_value(
961                                self.props.column_index_truncate_length(),
962                                stat.max_bytes_opt().unwrap(),
963                            )
964                            .0,
965                            self.page_metrics.num_page_nulls as i64,
966                        );
967                    } else {
968                        self.column_index_builder.append(
969                            null_page,
970                            stat.min_bytes_opt().unwrap().to_vec(),
971                            stat.max_bytes_opt().unwrap().to_vec(),
972                            self.page_metrics.num_page_nulls as i64,
973                        );
974                    }
975                }
976            }
977        }
978
979        // Append page histograms to the `ColumnIndex` histograms
980        self.column_index_builder.append_histograms(
981            &self.page_metrics.repetition_level_histogram,
982            &self.page_metrics.definition_level_histogram,
983        );
984
985        // Update the offset index
986        if let Some(builder) = self.offset_index_builder.as_mut() {
987            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
988            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
989        }
990    }
991
992    /// Determine if we should allow truncating min/max values for this column's statistics
993    fn can_truncate_value(&self) -> bool {
994        match self.descr.physical_type() {
995            // Don't truncate for Float16 and Decimal because their sort order is different
996            // from that of FIXED_LEN_BYTE_ARRAY sort order.
997            // So truncation of those types could lead to inaccurate min/max statistics
998            Type::FIXED_LEN_BYTE_ARRAY
999                if !matches!(
1000                    self.descr.logical_type_ref(),
1001                    Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
1002                ) =>
1003            {
1004                true
1005            }
1006            Type::BYTE_ARRAY => true,
1007            // Truncation only applies for fba/binary physical types
1008            _ => false,
1009        }
1010    }
1011
1012    /// Returns `true` if this column's logical type is a UTF-8 string.
1013    fn is_utf8(&self) -> bool {
1014        self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
1015            || self.get_descriptor().converted_type() == ConvertedType::UTF8
1016    }
1017
1018    /// Truncates a binary statistic to at most `truncation_length` bytes.
1019    ///
1020    /// If truncation is not possible, returns `data`.
1021    ///
1022    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
1023    ///
1024    /// UTF-8 Note:
1025    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
1026    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
1027    /// on non-character boundaries.
1028    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1029        truncation_length
1030            .filter(|l| data.len() > *l)
1031            .and_then(|l|
1032                // don't do extra work if this column isn't UTF-8
1033                if self.is_utf8() {
1034                    match str::from_utf8(data) {
1035                        Ok(str_data) => truncate_utf8(str_data, l),
1036                        Err(_) => Some(data[..l].to_vec()),
1037                    }
1038                } else {
1039                    Some(data[..l].to_vec())
1040                }
1041            )
1042            .map(|truncated| (truncated, true))
1043            .unwrap_or_else(|| (data.to_vec(), false))
1044    }
1045
1046    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
1047    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
1048    /// `truncation_length` bytes if the last byte(s) overflows.
1049    ///
1050    /// If truncation is not possible, returns `data`.
1051    ///
1052    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
1053    ///
1054    /// UTF-8 Note:
1055    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
1056    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
1057    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
1058    /// binary.
1059    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1060        truncation_length
1061            .filter(|l| data.len() > *l)
1062            .and_then(|l|
1063                // don't do extra work if this column isn't UTF-8
1064                if self.is_utf8() {
1065                    match str::from_utf8(data) {
1066                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
1067                        Err(_) => increment(data[..l].to_vec()),
1068                    }
1069                } else {
1070                    increment(data[..l].to_vec())
1071                }
1072            )
1073            .map(|truncated| (truncated, true))
1074            .unwrap_or_else(|| (data.to_vec(), false))
1075    }
1076
1077    /// Truncate the min and max values that will be written to a data page
1078    /// header or column chunk Statistics
1079    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
1080        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1081        match statistics {
1082            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1083                let (min, did_truncate_min) = self.truncate_min_value(
1084                    self.props.statistics_truncate_length(),
1085                    stats.min_bytes_opt().unwrap(),
1086                );
1087                let (max, did_truncate_max) = self.truncate_max_value(
1088                    self.props.statistics_truncate_length(),
1089                    stats.max_bytes_opt().unwrap(),
1090                );
1091                Statistics::ByteArray(
1092                    ValueStatistics::new(
1093                        Some(min.into()),
1094                        Some(max.into()),
1095                        stats.distinct_count(),
1096                        stats.null_count_opt(),
1097                        backwards_compatible_min_max,
1098                    )
1099                    .with_max_is_exact(!did_truncate_max)
1100                    .with_min_is_exact(!did_truncate_min),
1101                )
1102            }
1103            Statistics::FixedLenByteArray(stats)
1104                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1105            {
1106                let (min, did_truncate_min) = self.truncate_min_value(
1107                    self.props.statistics_truncate_length(),
1108                    stats.min_bytes_opt().unwrap(),
1109                );
1110                let (max, did_truncate_max) = self.truncate_max_value(
1111                    self.props.statistics_truncate_length(),
1112                    stats.max_bytes_opt().unwrap(),
1113                );
1114                Statistics::FixedLenByteArray(
1115                    ValueStatistics::new(
1116                        Some(min.into()),
1117                        Some(max.into()),
1118                        stats.distinct_count(),
1119                        stats.null_count_opt(),
1120                        backwards_compatible_min_max,
1121                    )
1122                    .with_max_is_exact(!did_truncate_max)
1123                    .with_min_is_exact(!did_truncate_min),
1124                )
1125            }
1126            stats => stats,
1127        }
1128    }
1129
1130    /// Adds data page.
1131    /// Data page is either buffered in case of dictionary encoding or written directly.
1132    pub(crate) fn add_data_page(&mut self) -> Result<()> {
1133        // Extract encoded values
1134        let values_data = self.encoder.flush_data_page()?;
1135
1136        let max_def_level = self.descr.max_def_level();
1137        let max_rep_level = self.descr.max_rep_level();
1138
1139        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1140
1141        let page_statistics = match (values_data.min_value, values_data.max_value) {
1142            (Some(min), Some(max)) => {
1143                // Update chunk level statistics
1144                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1145                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1146
1147                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1148                    ValueStatistics::new(
1149                        Some(min),
1150                        Some(max),
1151                        None,
1152                        Some(self.page_metrics.num_page_nulls),
1153                        false,
1154                    ),
1155                )
1156            }
1157            _ => None,
1158        };
1159
1160        // update column and offset index
1161        self.update_column_offset_index(
1162            page_statistics.as_ref(),
1163            values_data.variable_length_bytes,
1164        );
1165
1166        // Update histograms and variable_length_bytes in column_metrics
1167        self.column_metrics
1168            .update_from_page_metrics(&self.page_metrics);
1169        self.column_metrics
1170            .update_variable_length_bytes(values_data.variable_length_bytes);
1171
1172        // From here on, we only need page statistics if they will be written to the page header.
1173        let page_statistics = page_statistics
1174            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1175            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1176
1177        let compressed_page = match self.props.writer_version() {
1178            WriterVersion::PARQUET_1_0 => {
1179                let mut buffer = vec![];
1180
1181                if max_rep_level > 0 {
1182                    self.rep_levels_encoder
1183                        .flush_to(|data| buffer.extend_from_slice(data));
1184                }
1185
1186                if max_def_level > 0 {
1187                    self.def_levels_encoder
1188                        .flush_to(|data| buffer.extend_from_slice(data));
1189                }
1190
1191                buffer.extend_from_slice(&values_data.buf);
1192                let uncompressed_size = buffer.len();
1193
1194                if let Some(ref mut cmpr) = self.compressor {
1195                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1196                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1197                    compressed_buf.shrink_to_fit();
1198                    buffer = compressed_buf;
1199                }
1200
1201                let data_page = Page::DataPage {
1202                    buf: buffer.into(),
1203                    num_values: self.page_metrics.num_buffered_values,
1204                    encoding: values_data.encoding,
1205                    def_level_encoding: Encoding::RLE,
1206                    rep_level_encoding: Encoding::RLE,
1207                    statistics: page_statistics,
1208                };
1209
1210                CompressedPage::new(data_page, uncompressed_size)
1211            }
1212            WriterVersion::PARQUET_2_0 => {
1213                let mut rep_levels_byte_len = 0;
1214                let mut def_levels_byte_len = 0;
1215                let mut buffer = vec![];
1216
1217                if max_rep_level > 0 {
1218                    self.rep_levels_encoder
1219                        .flush_to(|data| buffer.extend_from_slice(data));
1220                    rep_levels_byte_len = buffer.len();
1221                }
1222
1223                if max_def_level > 0 {
1224                    self.def_levels_encoder
1225                        .flush_to(|data| buffer.extend_from_slice(data));
1226                    def_levels_byte_len = buffer.len() - rep_levels_byte_len;
1227                }
1228
1229                let uncompressed_size =
1230                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1231
1232                // Data Page v2 compresses values only.
1233                let is_compressed = match self.compressor {
1234                    Some(ref mut cmpr) => {
1235                        let buffer_len = buffer.len();
1236                        cmpr.compress(&values_data.buf, &mut buffer)?;
1237                        let compressed_values_size = buffer.len() - buffer_len;
1238                        let threshold = self
1239                            .props
1240                            .column_data_page_v2_compression_ratio_threshold(self.descr.path());
1241                        if (compressed_values_size as f64) >= (uncompressed_size as f64) * threshold
1242                        {
1243                            buffer.truncate(buffer_len);
1244                            buffer.extend_from_slice(&values_data.buf);
1245                            false
1246                        } else {
1247                            true
1248                        }
1249                    }
1250                    None => {
1251                        buffer.extend_from_slice(&values_data.buf);
1252                        false
1253                    }
1254                };
1255
1256                let data_page = Page::DataPageV2 {
1257                    buf: buffer.into(),
1258                    num_values: self.page_metrics.num_buffered_values,
1259                    encoding: values_data.encoding,
1260                    num_nulls: self.page_metrics.num_page_nulls as u32,
1261                    num_rows: self.page_metrics.num_buffered_rows,
1262                    def_levels_byte_len: def_levels_byte_len as u32,
1263                    rep_levels_byte_len: rep_levels_byte_len as u32,
1264                    is_compressed,
1265                    statistics: page_statistics,
1266                };
1267
1268                CompressedPage::new(data_page, uncompressed_size)
1269            }
1270        };
1271
1272        // Check if we need to buffer data page or flush it to the sink directly.
1273        if self.encoder.has_dictionary() {
1274            self.data_pages.push_back(compressed_page);
1275        } else {
1276            self.write_data_page(compressed_page)?;
1277        }
1278
1279        // Update total number of rows.
1280        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1281        self.page_metrics.new_page();
1282
1283        Ok(())
1284    }
1285
1286    /// Finalises any outstanding data pages and flushes buffered data pages from
1287    /// dictionary encoding into underlying sink.
1288    #[inline]
1289    fn flush_data_pages(&mut self) -> Result<()> {
1290        // Write all outstanding data to a new page.
1291        if self.page_metrics.num_buffered_values > 0 {
1292            self.add_data_page()?;
1293        }
1294
1295        while let Some(page) = self.data_pages.pop_front() {
1296            self.write_data_page(page)?;
1297        }
1298
1299        Ok(())
1300    }
1301
1302    /// Assembles column chunk metadata.
1303    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1304        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1305        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1306        let num_values = self.column_metrics.total_num_values as i64;
1307        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1308        // If data page offset is not set, then no pages have been written
1309        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1310
1311        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1312            .set_compression(self.codec)
1313            .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1314            .set_page_encoding_stats(self.encoding_stats.clone())
1315            .set_total_compressed_size(total_compressed_size)
1316            .set_total_uncompressed_size(total_uncompressed_size)
1317            .set_num_values(num_values)
1318            .set_data_page_offset(data_page_offset)
1319            .set_dictionary_page_offset(dict_page_offset);
1320
1321        if self.statistics_enabled != EnabledStatistics::None {
1322            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1323
1324            let statistics = ValueStatistics::<E::T>::new(
1325                self.column_metrics.min_column_value.clone(),
1326                self.column_metrics.max_column_value.clone(),
1327                self.column_metrics.column_distinct_count,
1328                Some(self.column_metrics.num_column_nulls),
1329                false,
1330            )
1331            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1332            .into();
1333
1334            let statistics = self.truncate_statistics(statistics);
1335
1336            builder = builder
1337                .set_statistics(statistics)
1338                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1339                .set_repetition_level_histogram(
1340                    self.column_metrics.repetition_level_histogram.take(),
1341                )
1342                .set_definition_level_histogram(
1343                    self.column_metrics.definition_level_histogram.take(),
1344                );
1345
1346            if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1347                builder = builder.set_geo_statistics(geo_stats);
1348            }
1349        }
1350
1351        builder = self.set_column_chunk_encryption_properties(builder);
1352
1353        let metadata = builder.build()?;
1354        Ok(metadata)
1355    }
1356
1357    /// Writes compressed data page into underlying sink and updates global metrics.
1358    #[inline]
1359    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1360        self.encodings.insert(page.encoding());
1361        match self.encoding_stats.last_mut() {
1362            Some(encoding_stats)
1363                if encoding_stats.page_type == page.page_type()
1364                    && encoding_stats.encoding == page.encoding() =>
1365            {
1366                encoding_stats.count += 1;
1367            }
1368            _ => {
1369                // data page type does not change inside a file
1370                // encoding can currently only change from dictionary to non-dictionary once
1371                self.encoding_stats.push(PageEncodingStats {
1372                    page_type: page.page_type(),
1373                    encoding: page.encoding(),
1374                    count: 1,
1375                });
1376            }
1377        }
1378        let page_spec = self.page_writer.write_page(page)?;
1379        // update offset index
1380        // compressed_size = header_size + compressed_data_size
1381        if let Some(builder) = self.offset_index_builder.as_mut() {
1382            builder
1383                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1384        }
1385        self.update_metrics_for_page(page_spec);
1386        Ok(())
1387    }
1388
1389    /// Writes dictionary page into underlying sink.
1390    #[inline]
1391    fn write_dictionary_page(&mut self) -> Result<()> {
1392        let compressed_page = {
1393            let mut page = self
1394                .encoder
1395                .flush_dict_page()?
1396                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1397
1398            let uncompressed_size = page.buf.len();
1399
1400            if let Some(ref mut cmpr) = self.compressor {
1401                let mut output_buf = Vec::with_capacity(uncompressed_size);
1402                cmpr.compress(&page.buf, &mut output_buf)?;
1403                page.buf = Bytes::from(output_buf);
1404            }
1405
1406            let dict_page = Page::DictionaryPage {
1407                buf: page.buf,
1408                num_values: page.num_values as u32,
1409                encoding: self.props.dictionary_page_encoding(),
1410                is_sorted: page.is_sorted,
1411            };
1412            CompressedPage::new(dict_page, uncompressed_size)
1413        };
1414
1415        self.encodings.insert(compressed_page.encoding());
1416        self.encoding_stats.push(PageEncodingStats {
1417            page_type: PageType::DICTIONARY_PAGE,
1418            encoding: compressed_page.encoding(),
1419            count: 1,
1420        });
1421        let page_spec = self.page_writer.write_page(compressed_page)?;
1422        self.update_metrics_for_page(page_spec);
1423        // For the directory page, don't need to update column/offset index.
1424        Ok(())
1425    }
1426
1427    /// Updates column writer metrics with each page metadata.
1428    #[inline]
1429    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1430        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1431        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1432        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1433
1434        match page_spec.page_type {
1435            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1436                self.column_metrics.total_num_values += page_spec.num_values as u64;
1437                if self.column_metrics.data_page_offset.is_none() {
1438                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1439                }
1440            }
1441            PageType::DICTIONARY_PAGE => {
1442                assert!(
1443                    self.column_metrics.dictionary_page_offset.is_none(),
1444                    "Dictionary offset is already set"
1445                );
1446                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1447            }
1448            _ => {}
1449        }
1450    }
1451
1452    #[inline]
1453    #[cfg(feature = "encryption")]
1454    fn set_column_chunk_encryption_properties(
1455        &self,
1456        builder: ColumnChunkMetaDataBuilder,
1457    ) -> ColumnChunkMetaDataBuilder {
1458        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1459            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1460                encryption_properties,
1461                &self.descr,
1462            ))
1463        } else {
1464            builder
1465        }
1466    }
1467
1468    #[inline]
1469    #[cfg(not(feature = "encryption"))]
1470    fn set_column_chunk_encryption_properties(
1471        &self,
1472        builder: ColumnChunkMetaDataBuilder,
1473    ) -> ColumnChunkMetaDataBuilder {
1474        builder
1475    }
1476}
1477
1478fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1479    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1480}
1481
1482fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1483    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1484}
1485
1486#[inline]
1487#[allow(clippy::eq_op)]
1488fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1489    match T::PHYSICAL_TYPE {
1490        Type::FLOAT | Type::DOUBLE => val != val,
1491        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1492            let val = val.as_bytes();
1493            let val = f16::from_le_bytes([val[0], val[1]]);
1494            val.is_nan()
1495        }
1496        _ => false,
1497    }
1498}
1499
1500/// Perform a conditional update of `cur`, skipping any NaN values
1501///
1502/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1503/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1504fn update_stat<T: ParquetValueType, F>(
1505    descr: &ColumnDescriptor,
1506    val: &T,
1507    cur: &mut Option<T>,
1508    should_update: F,
1509) where
1510    F: Fn(&T) -> bool,
1511{
1512    if is_nan(descr, val) {
1513        return;
1514    }
1515
1516    if cur.as_ref().is_none_or(should_update) {
1517        *cur = Some(val.clone());
1518    }
1519}
1520
1521/// Evaluate `a > b` according to underlying logical type.
1522fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1523    match T::PHYSICAL_TYPE {
1524        Type::INT32 | Type::INT64 => {
1525            if let Some(LogicalType::Integer {
1526                is_signed: false, ..
1527            }) = descr.logical_type_ref()
1528            {
1529                // need to compare unsigned
1530                return compare_greater_unsigned_int(a, b);
1531            }
1532
1533            match descr.converted_type() {
1534                ConvertedType::UINT_8
1535                | ConvertedType::UINT_16
1536                | ConvertedType::UINT_32
1537                | ConvertedType::UINT_64 => {
1538                    return compare_greater_unsigned_int(a, b);
1539                }
1540                _ => {}
1541            };
1542        }
1543        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1544            if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1545                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1546            }
1547            if let ConvertedType::DECIMAL = descr.converted_type() {
1548                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1549            }
1550            if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1551                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1552            }
1553        }
1554
1555        _ => {}
1556    }
1557
1558    // compare independent of logical / converted type
1559    a > b
1560}
1561
1562// ----------------------------------------------------------------------
1563// Encoding support for column writer.
1564// This mirrors parquet-mr default encodings for writes. See:
1565// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1566// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1567
1568/// Returns encoding for a column when no other encoding is provided in writer properties.
1569fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1570    match (kind, props.writer_version()) {
1571        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1572        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1573        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1574        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1575        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1576        _ => Encoding::PLAIN,
1577    }
1578}
1579
1580/// Returns true if dictionary is supported for column writer, false otherwise.
1581fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1582    match (kind, props.writer_version()) {
1583        // Booleans do not support dict encoding and should use a fallback encoding.
1584        (Type::BOOLEAN, _) => false,
1585        // Dictionary encoding was not enabled in PARQUET 1.0
1586        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1587        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1588        _ => true,
1589    }
1590}
1591
1592#[inline]
1593fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1594    a.as_u64().unwrap() > b.as_u64().unwrap()
1595}
1596
1597#[inline]
1598fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1599    let a = f16::from_le_bytes(a.try_into().unwrap());
1600    let b = f16::from_le_bytes(b.try_into().unwrap());
1601    a > b
1602}
1603
1604/// Signed comparison of bytes arrays
1605fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1606    let a_length = a.len();
1607    let b_length = b.len();
1608
1609    if a_length == 0 || b_length == 0 {
1610        return a_length > 0;
1611    }
1612
1613    let first_a: u8 = a[0];
1614    let first_b: u8 = b[0];
1615
1616    // We can short circuit for different signed numbers or
1617    // for equal length bytes arrays that have different first bytes.
1618    // The equality requirement is necessary for sign extension cases.
1619    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1620    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1621        return (first_a as i8) > (first_b as i8);
1622    }
1623
1624    // When the lengths are unequal and the numbers are of the same
1625    // sign we need to do comparison by sign extending the shorter
1626    // value first, and once we get to equal sized arrays, lexicographical
1627    // unsigned comparison of everything but the first byte is sufficient.
1628
1629    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1630
1631    if a_length != b_length {
1632        let not_equal = if a_length > b_length {
1633            let lead_length = a_length - b_length;
1634            a[0..lead_length].iter().any(|&x| x != extension)
1635        } else {
1636            let lead_length = b_length - a_length;
1637            b[0..lead_length].iter().any(|&x| x != extension)
1638        };
1639
1640        if not_equal {
1641            let negative_values: bool = (first_a as i8) < 0;
1642            let a_longer: bool = a_length > b_length;
1643            return if negative_values { !a_longer } else { a_longer };
1644        }
1645    }
1646
1647    (a[1..]) > (b[1..])
1648}
1649
1650/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1651/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1652/// is not possible within those constraints.
1653///
1654/// The caller guarantees that data.len() > length.
1655fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1656    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1657    Some(data.as_bytes()[..split].to_vec())
1658}
1659
1660/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1661/// longest such slice that is still a valid UTF-8 string while being less than `length`
1662/// bytes and non-empty. Returns `None` if no such transformation is possible.
1663///
1664/// The caller guarantees that data.len() > length.
1665fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1666    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1667    let lower_bound = length.saturating_sub(3);
1668    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1669    increment_utf8(data.get(..split)?)
1670}
1671
1672/// Increment the final character in a UTF-8 string in such a way that the returned result
1673/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1674/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1675/// Returns `None` if the string cannot be incremented.
1676///
1677/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1678fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1679    for (idx, original_char) in data.char_indices().rev() {
1680        let original_len = original_char.len_utf8();
1681        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1682            // do not allow increasing byte width of incremented char
1683            if next_char.len_utf8() == original_len {
1684                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1685                next_char.encode_utf8(&mut result[idx..]);
1686                return Some(result);
1687            }
1688        }
1689    }
1690
1691    None
1692}
1693
1694/// Try and increment the bytes from right to left.
1695///
1696/// Returns `None` if all bytes are set to `u8::MAX`.
1697fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1698    for byte in data.iter_mut().rev() {
1699        let (incremented, overflow) = byte.overflowing_add(1);
1700        *byte = incremented;
1701
1702        if !overflow {
1703            return Some(data);
1704        }
1705    }
1706
1707    None
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712    use crate::{
1713        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1714        schema::parser::parse_message_type,
1715    };
1716    use core::str;
1717    use rand::distr::uniform::SampleUniform;
1718    use std::{fs::File, sync::Arc};
1719
1720    use crate::column::{
1721        page::PageReader,
1722        reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1723    };
1724    use crate::file::writer::TrackedWrite;
1725    use crate::file::{
1726        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1727    };
1728    use crate::schema::types::{ColumnPath, Type as SchemaType};
1729    use crate::util::test_common::rand_gen::random_numbers_range;
1730
1731    use super::*;
1732
1733    #[test]
1734    fn test_column_writer_inconsistent_def_rep_length() {
1735        let page_writer = get_test_page_writer();
1736        let props = Default::default();
1737        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1738        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1739        assert!(res.is_err());
1740        if let Err(err) = res {
1741            assert_eq!(
1742                format!("{err}"),
1743                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1744            );
1745        }
1746    }
1747
1748    #[test]
1749    fn test_column_writer_invalid_def_levels() {
1750        let page_writer = get_test_page_writer();
1751        let props = Default::default();
1752        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1753        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1754        assert!(res.is_err());
1755        if let Err(err) = res {
1756            assert_eq!(
1757                format!("{err}"),
1758                "Parquet error: Definition levels are required, because max definition level = 1"
1759            );
1760        }
1761    }
1762
1763    #[test]
1764    fn test_column_writer_invalid_rep_levels() {
1765        let page_writer = get_test_page_writer();
1766        let props = Default::default();
1767        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1768        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1769        assert!(res.is_err());
1770        if let Err(err) = res {
1771            assert_eq!(
1772                format!("{err}"),
1773                "Parquet error: Repetition levels are required, because max repetition level = 1"
1774            );
1775        }
1776    }
1777
1778    #[test]
1779    fn test_column_writer_not_enough_values_to_write() {
1780        let page_writer = get_test_page_writer();
1781        let props = Default::default();
1782        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1783        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1784        assert!(res.is_err());
1785        if let Err(err) = res {
1786            assert_eq!(
1787                format!("{err}"),
1788                "Parquet error: Expected to write 4 values, but have only 2"
1789            );
1790        }
1791    }
1792
1793    #[test]
1794    fn test_column_writer_write_only_one_dictionary_page() {
1795        let page_writer = get_test_page_writer();
1796        let props = Default::default();
1797        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1798        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1799        // First page should be correctly written.
1800        writer.add_data_page().unwrap();
1801        writer.write_dictionary_page().unwrap();
1802        let err = writer.write_dictionary_page().unwrap_err().to_string();
1803        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1804    }
1805
1806    #[test]
1807    fn test_column_writer_error_when_writing_disabled_dictionary() {
1808        let page_writer = get_test_page_writer();
1809        let props = Arc::new(
1810            WriterProperties::builder()
1811                .set_dictionary_enabled(false)
1812                .build(),
1813        );
1814        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1815        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1816        let err = writer.write_dictionary_page().unwrap_err().to_string();
1817        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1818    }
1819
1820    #[test]
1821    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1822        let page_writer = get_test_page_writer();
1823        let props = Arc::new(
1824            WriterProperties::builder()
1825                .set_dictionary_enabled(true)
1826                .build(),
1827        );
1828        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1829        writer
1830            .write_batch(&[true, false, true, false], None, None)
1831            .unwrap();
1832
1833        let r = writer.close().unwrap();
1834        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1835        // byte.
1836        assert_eq!(r.bytes_written, 1);
1837        assert_eq!(r.rows_written, 4);
1838
1839        let metadata = r.metadata;
1840        assert_eq!(
1841            metadata.encodings().collect::<Vec<_>>(),
1842            vec![Encoding::PLAIN, Encoding::RLE]
1843        );
1844        assert_eq!(metadata.num_values(), 4); // just values
1845        assert_eq!(metadata.dictionary_page_offset(), None);
1846    }
1847
1848    #[test]
1849    fn test_column_writer_default_encoding_support_bool() {
1850        check_encoding_write_support::<BoolType>(
1851            WriterVersion::PARQUET_1_0,
1852            true,
1853            &[true, false],
1854            None,
1855            &[Encoding::PLAIN, Encoding::RLE],
1856            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1857        );
1858        check_encoding_write_support::<BoolType>(
1859            WriterVersion::PARQUET_1_0,
1860            false,
1861            &[true, false],
1862            None,
1863            &[Encoding::PLAIN, Encoding::RLE],
1864            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1865        );
1866        check_encoding_write_support::<BoolType>(
1867            WriterVersion::PARQUET_2_0,
1868            true,
1869            &[true, false],
1870            None,
1871            &[Encoding::RLE],
1872            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1873        );
1874        check_encoding_write_support::<BoolType>(
1875            WriterVersion::PARQUET_2_0,
1876            false,
1877            &[true, false],
1878            None,
1879            &[Encoding::RLE],
1880            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1881        );
1882    }
1883
1884    #[test]
1885    fn test_column_writer_default_encoding_support_int32() {
1886        check_encoding_write_support::<Int32Type>(
1887            WriterVersion::PARQUET_1_0,
1888            true,
1889            &[1, 2],
1890            Some(0),
1891            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1892            &[
1893                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1894                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1895            ],
1896        );
1897        check_encoding_write_support::<Int32Type>(
1898            WriterVersion::PARQUET_1_0,
1899            false,
1900            &[1, 2],
1901            None,
1902            &[Encoding::PLAIN, Encoding::RLE],
1903            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1904        );
1905        check_encoding_write_support::<Int32Type>(
1906            WriterVersion::PARQUET_2_0,
1907            true,
1908            &[1, 2],
1909            Some(0),
1910            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1911            &[
1912                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1913                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1914            ],
1915        );
1916        check_encoding_write_support::<Int32Type>(
1917            WriterVersion::PARQUET_2_0,
1918            false,
1919            &[1, 2],
1920            None,
1921            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1922            &[encoding_stats(
1923                PageType::DATA_PAGE_V2,
1924                Encoding::DELTA_BINARY_PACKED,
1925                1,
1926            )],
1927        );
1928    }
1929
1930    #[test]
1931    fn test_column_writer_default_encoding_support_int64() {
1932        check_encoding_write_support::<Int64Type>(
1933            WriterVersion::PARQUET_1_0,
1934            true,
1935            &[1, 2],
1936            Some(0),
1937            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1938            &[
1939                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1940                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1941            ],
1942        );
1943        check_encoding_write_support::<Int64Type>(
1944            WriterVersion::PARQUET_1_0,
1945            false,
1946            &[1, 2],
1947            None,
1948            &[Encoding::PLAIN, Encoding::RLE],
1949            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1950        );
1951        check_encoding_write_support::<Int64Type>(
1952            WriterVersion::PARQUET_2_0,
1953            true,
1954            &[1, 2],
1955            Some(0),
1956            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1957            &[
1958                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1959                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1960            ],
1961        );
1962        check_encoding_write_support::<Int64Type>(
1963            WriterVersion::PARQUET_2_0,
1964            false,
1965            &[1, 2],
1966            None,
1967            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1968            &[encoding_stats(
1969                PageType::DATA_PAGE_V2,
1970                Encoding::DELTA_BINARY_PACKED,
1971                1,
1972            )],
1973        );
1974    }
1975
1976    #[test]
1977    fn test_column_writer_default_encoding_support_int96() {
1978        check_encoding_write_support::<Int96Type>(
1979            WriterVersion::PARQUET_1_0,
1980            true,
1981            &[Int96::from(vec![1, 2, 3])],
1982            Some(0),
1983            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1984            &[
1985                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1986                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1987            ],
1988        );
1989        check_encoding_write_support::<Int96Type>(
1990            WriterVersion::PARQUET_1_0,
1991            false,
1992            &[Int96::from(vec![1, 2, 3])],
1993            None,
1994            &[Encoding::PLAIN, Encoding::RLE],
1995            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1996        );
1997        check_encoding_write_support::<Int96Type>(
1998            WriterVersion::PARQUET_2_0,
1999            true,
2000            &[Int96::from(vec![1, 2, 3])],
2001            Some(0),
2002            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2003            &[
2004                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2005                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2006            ],
2007        );
2008        check_encoding_write_support::<Int96Type>(
2009            WriterVersion::PARQUET_2_0,
2010            false,
2011            &[Int96::from(vec![1, 2, 3])],
2012            None,
2013            &[Encoding::PLAIN, Encoding::RLE],
2014            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2015        );
2016    }
2017
2018    #[test]
2019    fn test_column_writer_default_encoding_support_float() {
2020        check_encoding_write_support::<FloatType>(
2021            WriterVersion::PARQUET_1_0,
2022            true,
2023            &[1.0, 2.0],
2024            Some(0),
2025            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2026            &[
2027                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2028                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2029            ],
2030        );
2031        check_encoding_write_support::<FloatType>(
2032            WriterVersion::PARQUET_1_0,
2033            false,
2034            &[1.0, 2.0],
2035            None,
2036            &[Encoding::PLAIN, Encoding::RLE],
2037            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2038        );
2039        check_encoding_write_support::<FloatType>(
2040            WriterVersion::PARQUET_2_0,
2041            true,
2042            &[1.0, 2.0],
2043            Some(0),
2044            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2045            &[
2046                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2047                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2048            ],
2049        );
2050        check_encoding_write_support::<FloatType>(
2051            WriterVersion::PARQUET_2_0,
2052            false,
2053            &[1.0, 2.0],
2054            None,
2055            &[Encoding::PLAIN, Encoding::RLE],
2056            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2057        );
2058    }
2059
2060    #[test]
2061    fn test_column_writer_default_encoding_support_double() {
2062        check_encoding_write_support::<DoubleType>(
2063            WriterVersion::PARQUET_1_0,
2064            true,
2065            &[1.0, 2.0],
2066            Some(0),
2067            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2068            &[
2069                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2070                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2071            ],
2072        );
2073        check_encoding_write_support::<DoubleType>(
2074            WriterVersion::PARQUET_1_0,
2075            false,
2076            &[1.0, 2.0],
2077            None,
2078            &[Encoding::PLAIN, Encoding::RLE],
2079            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2080        );
2081        check_encoding_write_support::<DoubleType>(
2082            WriterVersion::PARQUET_2_0,
2083            true,
2084            &[1.0, 2.0],
2085            Some(0),
2086            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2087            &[
2088                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2089                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2090            ],
2091        );
2092        check_encoding_write_support::<DoubleType>(
2093            WriterVersion::PARQUET_2_0,
2094            false,
2095            &[1.0, 2.0],
2096            None,
2097            &[Encoding::PLAIN, Encoding::RLE],
2098            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2099        );
2100    }
2101
2102    #[test]
2103    fn test_column_writer_default_encoding_support_byte_array() {
2104        check_encoding_write_support::<ByteArrayType>(
2105            WriterVersion::PARQUET_1_0,
2106            true,
2107            &[ByteArray::from(vec![1u8])],
2108            Some(0),
2109            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2110            &[
2111                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2112                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2113            ],
2114        );
2115        check_encoding_write_support::<ByteArrayType>(
2116            WriterVersion::PARQUET_1_0,
2117            false,
2118            &[ByteArray::from(vec![1u8])],
2119            None,
2120            &[Encoding::PLAIN, Encoding::RLE],
2121            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2122        );
2123        check_encoding_write_support::<ByteArrayType>(
2124            WriterVersion::PARQUET_2_0,
2125            true,
2126            &[ByteArray::from(vec![1u8])],
2127            Some(0),
2128            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2129            &[
2130                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2131                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2132            ],
2133        );
2134        check_encoding_write_support::<ByteArrayType>(
2135            WriterVersion::PARQUET_2_0,
2136            false,
2137            &[ByteArray::from(vec![1u8])],
2138            None,
2139            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2140            &[encoding_stats(
2141                PageType::DATA_PAGE_V2,
2142                Encoding::DELTA_BYTE_ARRAY,
2143                1,
2144            )],
2145        );
2146    }
2147
2148    #[test]
2149    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2150        check_encoding_write_support::<FixedLenByteArrayType>(
2151            WriterVersion::PARQUET_1_0,
2152            true,
2153            &[ByteArray::from(vec![1u8]).into()],
2154            None,
2155            &[Encoding::PLAIN, Encoding::RLE],
2156            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2157        );
2158        check_encoding_write_support::<FixedLenByteArrayType>(
2159            WriterVersion::PARQUET_1_0,
2160            false,
2161            &[ByteArray::from(vec![1u8]).into()],
2162            None,
2163            &[Encoding::PLAIN, Encoding::RLE],
2164            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2165        );
2166        check_encoding_write_support::<FixedLenByteArrayType>(
2167            WriterVersion::PARQUET_2_0,
2168            true,
2169            &[ByteArray::from(vec![1u8]).into()],
2170            Some(0),
2171            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2172            &[
2173                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2174                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2175            ],
2176        );
2177        check_encoding_write_support::<FixedLenByteArrayType>(
2178            WriterVersion::PARQUET_2_0,
2179            false,
2180            &[ByteArray::from(vec![1u8]).into()],
2181            None,
2182            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2183            &[encoding_stats(
2184                PageType::DATA_PAGE_V2,
2185                Encoding::DELTA_BYTE_ARRAY,
2186                1,
2187            )],
2188        );
2189    }
2190
2191    #[test]
2192    fn test_column_writer_check_metadata() {
2193        let page_writer = get_test_page_writer();
2194        let props = Default::default();
2195        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2196        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2197
2198        let r = writer.close().unwrap();
2199        assert_eq!(r.bytes_written, 20);
2200        assert_eq!(r.rows_written, 4);
2201
2202        let metadata = r.metadata;
2203        assert_eq!(
2204            metadata.encodings().collect::<Vec<_>>(),
2205            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2206        );
2207        assert_eq!(metadata.num_values(), 4);
2208        assert_eq!(metadata.compressed_size(), 20);
2209        assert_eq!(metadata.uncompressed_size(), 20);
2210        assert_eq!(metadata.data_page_offset(), 0);
2211        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2212        if let Some(stats) = metadata.statistics() {
2213            assert_eq!(stats.null_count_opt(), Some(0));
2214            assert_eq!(stats.distinct_count_opt(), None);
2215            if let Statistics::Int32(stats) = stats {
2216                assert_eq!(stats.min_opt().unwrap(), &1);
2217                assert_eq!(stats.max_opt().unwrap(), &4);
2218            } else {
2219                panic!("expecting Statistics::Int32");
2220            }
2221        } else {
2222            panic!("metadata missing statistics");
2223        }
2224    }
2225
2226    #[test]
2227    fn test_column_writer_check_byte_array_min_max() {
2228        let page_writer = get_test_page_writer();
2229        let props = Default::default();
2230        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2231        writer
2232            .write_batch(
2233                &[
2234                    ByteArray::from(vec![
2235                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2236                        35u8, 231u8, 90u8, 0u8, 0u8,
2237                    ]),
2238                    ByteArray::from(vec![
2239                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2240                        152u8, 177u8, 56u8, 0u8, 0u8,
2241                    ]),
2242                    ByteArray::from(vec![
2243                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2244                        0u8,
2245                    ]),
2246                    ByteArray::from(vec![
2247                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2248                        44u8, 0u8, 0u8,
2249                    ]),
2250                ],
2251                None,
2252                None,
2253            )
2254            .unwrap();
2255        let metadata = writer.close().unwrap().metadata;
2256        if let Some(stats) = metadata.statistics() {
2257            if let Statistics::ByteArray(stats) = stats {
2258                assert_eq!(
2259                    stats.min_opt().unwrap(),
2260                    &ByteArray::from(vec![
2261                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2262                        35u8, 231u8, 90u8, 0u8, 0u8,
2263                    ])
2264                );
2265                assert_eq!(
2266                    stats.max_opt().unwrap(),
2267                    &ByteArray::from(vec![
2268                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2269                        44u8, 0u8, 0u8,
2270                    ])
2271                );
2272            } else {
2273                panic!("expecting Statistics::ByteArray");
2274            }
2275        } else {
2276            panic!("metadata missing statistics");
2277        }
2278    }
2279
2280    #[test]
2281    fn test_column_writer_uint32_converted_type_min_max() {
2282        let page_writer = get_test_page_writer();
2283        let props = Default::default();
2284        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2285            page_writer,
2286            0,
2287            0,
2288            props,
2289        );
2290        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2291        let metadata = writer.close().unwrap().metadata;
2292        if let Some(stats) = metadata.statistics() {
2293            if let Statistics::Int32(stats) = stats {
2294                assert_eq!(stats.min_opt().unwrap(), &0,);
2295                assert_eq!(stats.max_opt().unwrap(), &5,);
2296            } else {
2297                panic!("expecting Statistics::Int32");
2298            }
2299        } else {
2300            panic!("metadata missing statistics");
2301        }
2302    }
2303
2304    #[test]
2305    fn test_column_writer_precalculated_statistics() {
2306        let page_writer = get_test_page_writer();
2307        let props = Arc::new(
2308            WriterProperties::builder()
2309                .set_statistics_enabled(EnabledStatistics::Chunk)
2310                .build(),
2311        );
2312        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2313        writer
2314            .write_batch_with_statistics(
2315                &[1, 2, 3, 4],
2316                None,
2317                None,
2318                Some(&-17),
2319                Some(&9000),
2320                Some(55),
2321            )
2322            .unwrap();
2323
2324        let r = writer.close().unwrap();
2325        assert_eq!(r.bytes_written, 20);
2326        assert_eq!(r.rows_written, 4);
2327
2328        let metadata = r.metadata;
2329        assert_eq!(
2330            metadata.encodings().collect::<Vec<_>>(),
2331            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2332        );
2333        assert_eq!(metadata.num_values(), 4);
2334        assert_eq!(metadata.compressed_size(), 20);
2335        assert_eq!(metadata.uncompressed_size(), 20);
2336        assert_eq!(metadata.data_page_offset(), 0);
2337        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2338        if let Some(stats) = metadata.statistics() {
2339            assert_eq!(stats.null_count_opt(), Some(0));
2340            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2341            if let Statistics::Int32(stats) = stats {
2342                assert_eq!(stats.min_opt().unwrap(), &-17);
2343                assert_eq!(stats.max_opt().unwrap(), &9000);
2344            } else {
2345                panic!("expecting Statistics::Int32");
2346            }
2347        } else {
2348            panic!("metadata missing statistics");
2349        }
2350    }
2351
2352    #[test]
2353    fn test_mixed_precomputed_statistics() {
2354        let mut buf = Vec::with_capacity(100);
2355        let mut write = TrackedWrite::new(&mut buf);
2356        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2357        let props = Arc::new(
2358            WriterProperties::builder()
2359                .set_write_page_header_statistics(true)
2360                .build(),
2361        );
2362        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2363
2364        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2365        writer
2366            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2367            .unwrap();
2368
2369        let r = writer.close().unwrap();
2370
2371        let stats = r.metadata.statistics().unwrap();
2372        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2373        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2374        assert_eq!(stats.null_count_opt(), Some(0));
2375        assert!(stats.distinct_count_opt().is_none());
2376
2377        drop(write);
2378
2379        let props = ReaderProperties::builder()
2380            .set_backward_compatible_lz4(false)
2381            .set_read_page_statistics(true)
2382            .build();
2383        let reader = SerializedPageReader::new_with_properties(
2384            Arc::new(Bytes::from(buf)),
2385            &r.metadata,
2386            r.rows_written as usize,
2387            None,
2388            Arc::new(props),
2389        )
2390        .unwrap();
2391
2392        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2393        assert_eq!(pages.len(), 2);
2394
2395        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2396        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2397
2398        let page_statistics = pages[1].statistics().unwrap();
2399        assert_eq!(
2400            page_statistics.min_bytes_opt().unwrap(),
2401            1_i32.to_le_bytes()
2402        );
2403        assert_eq!(
2404            page_statistics.max_bytes_opt().unwrap(),
2405            7_i32.to_le_bytes()
2406        );
2407        assert_eq!(page_statistics.null_count_opt(), Some(0));
2408        assert!(page_statistics.distinct_count_opt().is_none());
2409    }
2410
2411    #[test]
2412    fn test_disabled_statistics() {
2413        let mut buf = Vec::with_capacity(100);
2414        let mut write = TrackedWrite::new(&mut buf);
2415        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2416        let props = WriterProperties::builder()
2417            .set_statistics_enabled(EnabledStatistics::None)
2418            .set_writer_version(WriterVersion::PARQUET_2_0)
2419            .build();
2420        let props = Arc::new(props);
2421
2422        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2423        writer
2424            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2425            .unwrap();
2426
2427        let r = writer.close().unwrap();
2428        assert!(r.metadata.statistics().is_none());
2429
2430        drop(write);
2431
2432        let props = ReaderProperties::builder()
2433            .set_backward_compatible_lz4(false)
2434            .build();
2435        let reader = SerializedPageReader::new_with_properties(
2436            Arc::new(Bytes::from(buf)),
2437            &r.metadata,
2438            r.rows_written as usize,
2439            None,
2440            Arc::new(props),
2441        )
2442        .unwrap();
2443
2444        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2445        assert_eq!(pages.len(), 2);
2446
2447        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2448        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2449
2450        match &pages[1] {
2451            Page::DataPageV2 {
2452                num_values,
2453                num_nulls,
2454                num_rows,
2455                statistics,
2456                ..
2457            } => {
2458                assert_eq!(*num_values, 6);
2459                assert_eq!(*num_nulls, 2);
2460                assert_eq!(*num_rows, 6);
2461                assert!(statistics.is_none());
2462            }
2463            _ => unreachable!(),
2464        }
2465    }
2466
2467    #[test]
2468    fn test_column_writer_empty_column_roundtrip() {
2469        let props = Default::default();
2470        column_roundtrip::<Int32Type>(props, &[], None, None);
2471    }
2472
2473    #[test]
2474    fn test_column_writer_non_nullable_values_roundtrip() {
2475        let props = Default::default();
2476        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2477    }
2478
2479    #[test]
2480    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2481        let props = Default::default();
2482        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2483    }
2484
2485    #[test]
2486    fn test_column_writer_nullable_repeated_values_roundtrip() {
2487        let props = Default::default();
2488        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2489    }
2490
2491    #[test]
2492    fn test_column_writer_dictionary_fallback_small_data_page() {
2493        let props = WriterProperties::builder()
2494            .set_dictionary_page_size_limit(32)
2495            .set_data_page_size_limit(32)
2496            .build();
2497        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2498    }
2499
2500    #[test]
2501    fn test_column_writer_small_write_batch_size() {
2502        for i in &[1usize, 2, 5, 10, 11, 1023] {
2503            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2504
2505            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2506        }
2507    }
2508
2509    #[test]
2510    fn test_column_writer_dictionary_disabled_v1() {
2511        let props = WriterProperties::builder()
2512            .set_writer_version(WriterVersion::PARQUET_1_0)
2513            .set_dictionary_enabled(false)
2514            .build();
2515        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2516    }
2517
2518    #[test]
2519    fn test_column_writer_dictionary_disabled_v2() {
2520        let props = WriterProperties::builder()
2521            .set_writer_version(WriterVersion::PARQUET_2_0)
2522            .set_dictionary_enabled(false)
2523            .build();
2524        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2525    }
2526
2527    #[test]
2528    fn test_column_writer_compression_v1() {
2529        let props = WriterProperties::builder()
2530            .set_writer_version(WriterVersion::PARQUET_1_0)
2531            .set_compression(Compression::SNAPPY)
2532            .build();
2533        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2534    }
2535
2536    #[test]
2537    fn test_column_writer_compression_v2() {
2538        let props = WriterProperties::builder()
2539            .set_writer_version(WriterVersion::PARQUET_2_0)
2540            .set_compression(Compression::SNAPPY)
2541            .build();
2542        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2543    }
2544
2545    #[test]
2546    fn test_column_writer_v2_compression_ratio_threshold() {
2547        fn write_v2_page(threshold: f64) -> bool {
2548            let mut buf = Vec::with_capacity(4096);
2549            let mut write = TrackedWrite::new(&mut buf);
2550            let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2551            let props = Arc::new(
2552                WriterProperties::builder()
2553                    .set_writer_version(WriterVersion::PARQUET_2_0)
2554                    .set_compression(Compression::SNAPPY)
2555                    .set_dictionary_enabled(false)
2556                    .set_data_page_v2_compression_ratio_threshold(threshold)
2557                    .build(),
2558            );
2559
2560            let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2561            let values: Vec<i32> = vec![42; 4096];
2562            writer.write_batch(&values, None, None).unwrap();
2563            let r = writer.close().unwrap();
2564            drop(write);
2565
2566            let reader_props = ReaderProperties::builder()
2567                .set_backward_compatible_lz4(false)
2568                .build();
2569            let reader = SerializedPageReader::new_with_properties(
2570                Arc::new(Bytes::from(buf)),
2571                &r.metadata,
2572                r.rows_written as usize,
2573                None,
2574                Arc::new(reader_props),
2575            )
2576            .unwrap();
2577            let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2578            let data_page = pages
2579                .iter()
2580                .find(|p| p.page_type() == PageType::DATA_PAGE_V2)
2581                .expect("expected a v2 data page");
2582            match data_page {
2583                Page::DataPageV2 { is_compressed, .. } => *is_compressed,
2584                _ => unreachable!(),
2585            }
2586        }
2587
2588        // Default threshold keeps the compressed buffer for constant data.
2589        assert!(write_v2_page(1.0));
2590        // A strict threshold (require >1000x reduction) discards it.
2591        assert!(!write_v2_page(0.001));
2592    }
2593
2594    #[test]
2595    fn test_column_writer_add_data_pages_with_dict() {
2596        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2597        // and no fallback occurred so far.
2598        let mut file = tempfile::tempfile().unwrap();
2599        let mut write = TrackedWrite::new(&mut file);
2600        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2601        let props = Arc::new(
2602            WriterProperties::builder()
2603                .set_data_page_size_limit(10)
2604                .set_write_batch_size(3) // write 3 values at a time
2605                .build(),
2606        );
2607        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2608        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2609        writer.write_batch(data, None, None).unwrap();
2610        let r = writer.close().unwrap();
2611
2612        drop(write);
2613
2614        // Read pages and check the sequence
2615        let props = ReaderProperties::builder()
2616            .set_backward_compatible_lz4(false)
2617            .build();
2618        let mut page_reader = Box::new(
2619            SerializedPageReader::new_with_properties(
2620                Arc::new(file),
2621                &r.metadata,
2622                r.rows_written as usize,
2623                None,
2624                Arc::new(props),
2625            )
2626            .unwrap(),
2627        );
2628        let mut res = Vec::new();
2629        while let Some(page) = page_reader.get_next_page().unwrap() {
2630            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2631        }
2632        assert_eq!(
2633            res,
2634            vec![
2635                (PageType::DICTIONARY_PAGE, 10, 40),
2636                (PageType::DATA_PAGE, 9, 10),
2637                (PageType::DATA_PAGE, 1, 3),
2638            ]
2639        );
2640        assert_eq!(
2641            r.metadata.page_encoding_stats(),
2642            Some(&vec![
2643                PageEncodingStats {
2644                    page_type: PageType::DICTIONARY_PAGE,
2645                    encoding: Encoding::PLAIN,
2646                    count: 1
2647                },
2648                PageEncodingStats {
2649                    page_type: PageType::DATA_PAGE,
2650                    encoding: Encoding::RLE_DICTIONARY,
2651                    count: 2,
2652                }
2653            ])
2654        );
2655    }
2656
2657    #[test]
2658    fn test_column_writer_column_data_page_size_limit() {
2659        let props = Arc::new(
2660            WriterProperties::builder()
2661                .set_writer_version(WriterVersion::PARQUET_1_0)
2662                .set_dictionary_enabled(false)
2663                .set_data_page_size_limit(1000)
2664                .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2665                .set_write_batch_size(3)
2666                .build(),
2667        );
2668        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2669
2670        let col_values =
2671            write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2672        let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2673
2674        assert_eq!(col_values, vec![3, 3, 3, 1]);
2675        assert_eq!(other_values, vec![10]);
2676    }
2677
2678    #[test]
2679    fn test_bool_statistics() {
2680        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2681        // Booleans have an unsigned sort order and so are not compatible
2682        // with the deprecated `min` and `max` statistics
2683        assert!(!stats.is_min_max_backwards_compatible());
2684        if let Statistics::Boolean(stats) = stats {
2685            assert_eq!(stats.min_opt().unwrap(), &false);
2686            assert_eq!(stats.max_opt().unwrap(), &true);
2687        } else {
2688            panic!("expecting Statistics::Boolean, got {stats:?}");
2689        }
2690    }
2691
2692    #[test]
2693    fn test_int32_statistics() {
2694        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2695        assert!(stats.is_min_max_backwards_compatible());
2696        if let Statistics::Int32(stats) = stats {
2697            assert_eq!(stats.min_opt().unwrap(), &-2);
2698            assert_eq!(stats.max_opt().unwrap(), &3);
2699        } else {
2700            panic!("expecting Statistics::Int32, got {stats:?}");
2701        }
2702    }
2703
2704    #[test]
2705    fn test_int64_statistics() {
2706        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2707        assert!(stats.is_min_max_backwards_compatible());
2708        if let Statistics::Int64(stats) = stats {
2709            assert_eq!(stats.min_opt().unwrap(), &-2);
2710            assert_eq!(stats.max_opt().unwrap(), &3);
2711        } else {
2712            panic!("expecting Statistics::Int64, got {stats:?}");
2713        }
2714    }
2715
2716    #[test]
2717    fn test_int96_statistics() {
2718        let input = vec![
2719            Int96::from(vec![1, 20, 30]),
2720            Int96::from(vec![3, 20, 10]),
2721            Int96::from(vec![0, 20, 30]),
2722            Int96::from(vec![2, 20, 30]),
2723        ]
2724        .into_iter()
2725        .collect::<Vec<Int96>>();
2726
2727        let stats = statistics_roundtrip::<Int96Type>(&input);
2728        assert!(!stats.is_min_max_backwards_compatible());
2729        if let Statistics::Int96(stats) = stats {
2730            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2731            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2732        } else {
2733            panic!("expecting Statistics::Int96, got {stats:?}");
2734        }
2735    }
2736
2737    #[test]
2738    fn test_float_statistics() {
2739        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2740        assert!(stats.is_min_max_backwards_compatible());
2741        if let Statistics::Float(stats) = stats {
2742            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2743            assert_eq!(stats.max_opt().unwrap(), &3.0);
2744        } else {
2745            panic!("expecting Statistics::Float, got {stats:?}");
2746        }
2747    }
2748
2749    #[test]
2750    fn test_double_statistics() {
2751        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2752        assert!(stats.is_min_max_backwards_compatible());
2753        if let Statistics::Double(stats) = stats {
2754            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2755            assert_eq!(stats.max_opt().unwrap(), &3.0);
2756        } else {
2757            panic!("expecting Statistics::Double, got {stats:?}");
2758        }
2759    }
2760
2761    #[test]
2762    fn test_byte_array_statistics() {
2763        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2764            .iter()
2765            .map(|&s| s.into())
2766            .collect::<Vec<_>>();
2767
2768        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2769        assert!(!stats.is_min_max_backwards_compatible());
2770        if let Statistics::ByteArray(stats) = stats {
2771            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2772            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2773        } else {
2774            panic!("expecting Statistics::ByteArray, got {stats:?}");
2775        }
2776    }
2777
2778    #[test]
2779    fn test_fixed_len_byte_array_statistics() {
2780        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2781            .iter()
2782            .map(|&s| ByteArray::from(s).into())
2783            .collect::<Vec<_>>();
2784
2785        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2786        assert!(!stats.is_min_max_backwards_compatible());
2787        if let Statistics::FixedLenByteArray(stats) = stats {
2788            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2789            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2790            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2791            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2792        } else {
2793            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2794        }
2795    }
2796
2797    #[test]
2798    fn test_column_writer_check_float16_min_max() {
2799        let input = [
2800            -f16::ONE,
2801            f16::from_f32(3.0),
2802            -f16::from_f32(2.0),
2803            f16::from_f32(2.0),
2804        ]
2805        .into_iter()
2806        .map(|s| ByteArray::from(s).into())
2807        .collect::<Vec<_>>();
2808
2809        let stats = float16_statistics_roundtrip(&input);
2810        assert!(stats.is_min_max_backwards_compatible());
2811        assert_eq!(
2812            stats.min_opt().unwrap(),
2813            &ByteArray::from(-f16::from_f32(2.0))
2814        );
2815        assert_eq!(
2816            stats.max_opt().unwrap(),
2817            &ByteArray::from(f16::from_f32(3.0))
2818        );
2819    }
2820
2821    #[test]
2822    fn test_column_writer_check_float16_nan_middle() {
2823        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2824            .into_iter()
2825            .map(|s| ByteArray::from(s).into())
2826            .collect::<Vec<_>>();
2827
2828        let stats = float16_statistics_roundtrip(&input);
2829        assert!(stats.is_min_max_backwards_compatible());
2830        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2831        assert_eq!(
2832            stats.max_opt().unwrap(),
2833            &ByteArray::from(f16::ONE + f16::ONE)
2834        );
2835    }
2836
2837    #[test]
2838    fn test_float16_statistics_nan_middle() {
2839        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2840            .into_iter()
2841            .map(|s| ByteArray::from(s).into())
2842            .collect::<Vec<_>>();
2843
2844        let stats = float16_statistics_roundtrip(&input);
2845        assert!(stats.is_min_max_backwards_compatible());
2846        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2847        assert_eq!(
2848            stats.max_opt().unwrap(),
2849            &ByteArray::from(f16::ONE + f16::ONE)
2850        );
2851    }
2852
2853    #[test]
2854    fn test_float16_statistics_nan_start() {
2855        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2856            .into_iter()
2857            .map(|s| ByteArray::from(s).into())
2858            .collect::<Vec<_>>();
2859
2860        let stats = float16_statistics_roundtrip(&input);
2861        assert!(stats.is_min_max_backwards_compatible());
2862        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2863        assert_eq!(
2864            stats.max_opt().unwrap(),
2865            &ByteArray::from(f16::ONE + f16::ONE)
2866        );
2867    }
2868
2869    #[test]
2870    fn test_float16_statistics_nan_only() {
2871        let input = [f16::NAN, f16::NAN]
2872            .into_iter()
2873            .map(|s| ByteArray::from(s).into())
2874            .collect::<Vec<_>>();
2875
2876        let stats = float16_statistics_roundtrip(&input);
2877        assert!(stats.min_bytes_opt().is_none());
2878        assert!(stats.max_bytes_opt().is_none());
2879        assert!(stats.is_min_max_backwards_compatible());
2880    }
2881
2882    #[test]
2883    fn test_float16_statistics_zero_only() {
2884        let input = [f16::ZERO]
2885            .into_iter()
2886            .map(|s| ByteArray::from(s).into())
2887            .collect::<Vec<_>>();
2888
2889        let stats = float16_statistics_roundtrip(&input);
2890        assert!(stats.is_min_max_backwards_compatible());
2891        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2892        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2893    }
2894
2895    #[test]
2896    fn test_float16_statistics_neg_zero_only() {
2897        let input = [f16::NEG_ZERO]
2898            .into_iter()
2899            .map(|s| ByteArray::from(s).into())
2900            .collect::<Vec<_>>();
2901
2902        let stats = float16_statistics_roundtrip(&input);
2903        assert!(stats.is_min_max_backwards_compatible());
2904        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2905        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2906    }
2907
2908    #[test]
2909    fn test_float16_statistics_zero_min() {
2910        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2911            .into_iter()
2912            .map(|s| ByteArray::from(s).into())
2913            .collect::<Vec<_>>();
2914
2915        let stats = float16_statistics_roundtrip(&input);
2916        assert!(stats.is_min_max_backwards_compatible());
2917        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2918        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2919    }
2920
2921    #[test]
2922    fn test_float16_statistics_neg_zero_max() {
2923        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2924            .into_iter()
2925            .map(|s| ByteArray::from(s).into())
2926            .collect::<Vec<_>>();
2927
2928        let stats = float16_statistics_roundtrip(&input);
2929        assert!(stats.is_min_max_backwards_compatible());
2930        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2931        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2932    }
2933
2934    #[test]
2935    fn test_float_statistics_nan_middle() {
2936        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2937        assert!(stats.is_min_max_backwards_compatible());
2938        if let Statistics::Float(stats) = stats {
2939            assert_eq!(stats.min_opt().unwrap(), &1.0);
2940            assert_eq!(stats.max_opt().unwrap(), &2.0);
2941        } else {
2942            panic!("expecting Statistics::Float");
2943        }
2944    }
2945
2946    #[test]
2947    fn test_float_statistics_nan_start() {
2948        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2949        assert!(stats.is_min_max_backwards_compatible());
2950        if let Statistics::Float(stats) = stats {
2951            assert_eq!(stats.min_opt().unwrap(), &1.0);
2952            assert_eq!(stats.max_opt().unwrap(), &2.0);
2953        } else {
2954            panic!("expecting Statistics::Float");
2955        }
2956    }
2957
2958    #[test]
2959    fn test_float_statistics_nan_only() {
2960        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2961        assert!(stats.min_bytes_opt().is_none());
2962        assert!(stats.max_bytes_opt().is_none());
2963        assert!(stats.is_min_max_backwards_compatible());
2964        assert!(matches!(stats, Statistics::Float(_)));
2965    }
2966
2967    #[test]
2968    fn test_float_statistics_zero_only() {
2969        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2970        assert!(stats.is_min_max_backwards_compatible());
2971        if let Statistics::Float(stats) = stats {
2972            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2973            assert!(stats.min_opt().unwrap().is_sign_negative());
2974            assert_eq!(stats.max_opt().unwrap(), &0.0);
2975            assert!(stats.max_opt().unwrap().is_sign_positive());
2976        } else {
2977            panic!("expecting Statistics::Float");
2978        }
2979    }
2980
2981    #[test]
2982    fn test_float_statistics_neg_zero_only() {
2983        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2984        assert!(stats.is_min_max_backwards_compatible());
2985        if let Statistics::Float(stats) = stats {
2986            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2987            assert!(stats.min_opt().unwrap().is_sign_negative());
2988            assert_eq!(stats.max_opt().unwrap(), &0.0);
2989            assert!(stats.max_opt().unwrap().is_sign_positive());
2990        } else {
2991            panic!("expecting Statistics::Float");
2992        }
2993    }
2994
2995    #[test]
2996    fn test_float_statistics_zero_min() {
2997        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2998        assert!(stats.is_min_max_backwards_compatible());
2999        if let Statistics::Float(stats) = stats {
3000            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3001            assert!(stats.min_opt().unwrap().is_sign_negative());
3002            assert_eq!(stats.max_opt().unwrap(), &2.0);
3003        } else {
3004            panic!("expecting Statistics::Float");
3005        }
3006    }
3007
3008    #[test]
3009    fn test_float_statistics_neg_zero_max() {
3010        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
3011        assert!(stats.is_min_max_backwards_compatible());
3012        if let Statistics::Float(stats) = stats {
3013            assert_eq!(stats.min_opt().unwrap(), &-2.0);
3014            assert_eq!(stats.max_opt().unwrap(), &0.0);
3015            assert!(stats.max_opt().unwrap().is_sign_positive());
3016        } else {
3017            panic!("expecting Statistics::Float");
3018        }
3019    }
3020
3021    #[test]
3022    fn test_double_statistics_nan_middle() {
3023        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
3024        assert!(stats.is_min_max_backwards_compatible());
3025        if let Statistics::Double(stats) = stats {
3026            assert_eq!(stats.min_opt().unwrap(), &1.0);
3027            assert_eq!(stats.max_opt().unwrap(), &2.0);
3028        } else {
3029            panic!("expecting Statistics::Double");
3030        }
3031    }
3032
3033    #[test]
3034    fn test_double_statistics_nan_start() {
3035        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
3036        assert!(stats.is_min_max_backwards_compatible());
3037        if let Statistics::Double(stats) = stats {
3038            assert_eq!(stats.min_opt().unwrap(), &1.0);
3039            assert_eq!(stats.max_opt().unwrap(), &2.0);
3040        } else {
3041            panic!("expecting Statistics::Double");
3042        }
3043    }
3044
3045    #[test]
3046    fn test_double_statistics_nan_only() {
3047        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
3048        assert!(stats.min_bytes_opt().is_none());
3049        assert!(stats.max_bytes_opt().is_none());
3050        assert!(matches!(stats, Statistics::Double(_)));
3051        assert!(stats.is_min_max_backwards_compatible());
3052    }
3053
3054    #[test]
3055    fn test_double_statistics_zero_only() {
3056        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
3057        assert!(stats.is_min_max_backwards_compatible());
3058        if let Statistics::Double(stats) = stats {
3059            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3060            assert!(stats.min_opt().unwrap().is_sign_negative());
3061            assert_eq!(stats.max_opt().unwrap(), &0.0);
3062            assert!(stats.max_opt().unwrap().is_sign_positive());
3063        } else {
3064            panic!("expecting Statistics::Double");
3065        }
3066    }
3067
3068    #[test]
3069    fn test_double_statistics_neg_zero_only() {
3070        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
3071        assert!(stats.is_min_max_backwards_compatible());
3072        if let Statistics::Double(stats) = stats {
3073            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3074            assert!(stats.min_opt().unwrap().is_sign_negative());
3075            assert_eq!(stats.max_opt().unwrap(), &0.0);
3076            assert!(stats.max_opt().unwrap().is_sign_positive());
3077        } else {
3078            panic!("expecting Statistics::Double");
3079        }
3080    }
3081
3082    #[test]
3083    fn test_double_statistics_zero_min() {
3084        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
3085        assert!(stats.is_min_max_backwards_compatible());
3086        if let Statistics::Double(stats) = stats {
3087            assert_eq!(stats.min_opt().unwrap(), &-0.0);
3088            assert!(stats.min_opt().unwrap().is_sign_negative());
3089            assert_eq!(stats.max_opt().unwrap(), &2.0);
3090        } else {
3091            panic!("expecting Statistics::Double");
3092        }
3093    }
3094
3095    #[test]
3096    fn test_double_statistics_neg_zero_max() {
3097        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
3098        assert!(stats.is_min_max_backwards_compatible());
3099        if let Statistics::Double(stats) = stats {
3100            assert_eq!(stats.min_opt().unwrap(), &-2.0);
3101            assert_eq!(stats.max_opt().unwrap(), &0.0);
3102            assert!(stats.max_opt().unwrap().is_sign_positive());
3103        } else {
3104            panic!("expecting Statistics::Double");
3105        }
3106    }
3107
3108    #[test]
3109    fn test_compare_greater_byte_array_decimals() {
3110        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
3111        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
3112        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
3113        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
3114        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
3115        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
3116        assert!(!compare_greater_byte_array_decimals(
3117            &[0u8, 1u8,],
3118            &[1u8, 0u8,],
3119        ),);
3120        assert!(!compare_greater_byte_array_decimals(
3121            &[255u8, 35u8, 0u8, 0u8,],
3122            &[0u8,],
3123        ),);
3124        assert!(compare_greater_byte_array_decimals(
3125            &[0u8,],
3126            &[255u8, 35u8, 0u8, 0u8,],
3127        ),);
3128    }
3129
3130    #[test]
3131    fn test_column_index_with_null_pages() {
3132        // write a single page of all nulls
3133        let page_writer = get_test_page_writer();
3134        let props = Default::default();
3135        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
3136        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
3137
3138        let r = writer.close().unwrap();
3139        assert!(r.column_index.is_some());
3140        let col_idx = r.column_index.unwrap();
3141        let col_idx = match col_idx {
3142            ColumnIndexMetaData::INT32(col_idx) => col_idx,
3143            _ => panic!("wrong stats type"),
3144        };
3145        // null_pages should be true for page 0
3146        assert!(col_idx.is_null_page(0));
3147        // min and max should be empty byte arrays
3148        assert!(col_idx.min_value(0).is_none());
3149        assert!(col_idx.max_value(0).is_none());
3150        // null_counts should be defined and be 4 for page 0
3151        assert!(col_idx.null_count(0).is_some());
3152        assert_eq!(col_idx.null_count(0), Some(4));
3153        // there is no repetition so rep histogram should be absent
3154        assert!(col_idx.repetition_level_histogram(0).is_none());
3155        // definition_level_histogram should be present and should be 0:4, 1:0
3156        assert!(col_idx.definition_level_histogram(0).is_some());
3157        assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3158    }
3159
3160    #[test]
3161    fn test_column_offset_index_metadata() {
3162        // write data
3163        // and check the offset index and column index
3164        let page_writer = get_test_page_writer();
3165        let props = Default::default();
3166        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3167        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3168        // first page
3169        writer.flush_data_pages().unwrap();
3170        // second page
3171        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3172
3173        let r = writer.close().unwrap();
3174        let column_index = r.column_index.unwrap();
3175        let offset_index = r.offset_index.unwrap();
3176
3177        assert_eq!(8, r.rows_written);
3178
3179        // column index
3180        let column_index = match column_index {
3181            ColumnIndexMetaData::INT32(column_index) => column_index,
3182            _ => panic!("wrong stats type"),
3183        };
3184        assert_eq!(2, column_index.num_pages());
3185        assert_eq!(2, offset_index.page_locations.len());
3186        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3187        for idx in 0..2 {
3188            assert!(!column_index.is_null_page(idx));
3189            assert_eq!(0, column_index.null_count(0).unwrap());
3190        }
3191
3192        if let Some(stats) = r.metadata.statistics() {
3193            assert_eq!(stats.null_count_opt(), Some(0));
3194            assert_eq!(stats.distinct_count_opt(), None);
3195            if let Statistics::Int32(stats) = stats {
3196                // first page is [1,2,3,4]
3197                // second page is [-5,2,4,8]
3198                // note that we don't increment here, as this is a non BinaryArray type.
3199                assert_eq!(stats.min_opt(), column_index.min_value(1));
3200                assert_eq!(stats.max_opt(), column_index.max_value(1));
3201            } else {
3202                panic!("expecting Statistics::Int32");
3203            }
3204        } else {
3205            panic!("metadata missing statistics");
3206        }
3207
3208        // page location
3209        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3210        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3211    }
3212
3213    /// Verify min/max value truncation in the column index works as expected
3214    #[test]
3215    fn test_column_offset_index_metadata_truncating() {
3216        // write data
3217        // and check the offset index and column index
3218        let page_writer = get_test_page_writer();
3219        let props = WriterProperties::builder()
3220            .set_statistics_truncate_length(None) // disable column index truncation
3221            .build()
3222            .into();
3223        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3224
3225        let mut data = vec![FixedLenByteArray::default(); 3];
3226        // This is the expected min value - "aaa..."
3227        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3228        // This is the expected max value - "ZZZ..."
3229        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3230        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3231
3232        writer.write_batch(&data, None, None).unwrap();
3233
3234        writer.flush_data_pages().unwrap();
3235
3236        let r = writer.close().unwrap();
3237        let column_index = r.column_index.unwrap();
3238        let offset_index = r.offset_index.unwrap();
3239
3240        let column_index = match column_index {
3241            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3242            _ => panic!("wrong stats type"),
3243        };
3244
3245        assert_eq!(3, r.rows_written);
3246
3247        // column index
3248        assert_eq!(1, column_index.num_pages());
3249        assert_eq!(1, offset_index.page_locations.len());
3250        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3251        assert!(!column_index.is_null_page(0));
3252        assert_eq!(Some(0), column_index.null_count(0));
3253
3254        if let Some(stats) = r.metadata.statistics() {
3255            assert_eq!(stats.null_count_opt(), Some(0));
3256            assert_eq!(stats.distinct_count_opt(), None);
3257            if let Statistics::FixedLenByteArray(stats) = stats {
3258                let column_index_min_value = column_index.min_value(0).unwrap();
3259                let column_index_max_value = column_index.max_value(0).unwrap();
3260
3261                // Column index stats are truncated, while the column chunk's aren't.
3262                assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3263                assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3264
3265                assert_eq!(
3266                    column_index_min_value.len(),
3267                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3268                );
3269                assert_eq!(column_index_min_value, &[97_u8; 64]);
3270                assert_eq!(
3271                    column_index_max_value.len(),
3272                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3273                );
3274
3275                // We expect the last byte to be incremented
3276                assert_eq!(
3277                    *column_index_max_value.last().unwrap(),
3278                    *column_index_max_value.first().unwrap() + 1
3279                );
3280            } else {
3281                panic!("expecting Statistics::FixedLenByteArray");
3282            }
3283        } else {
3284            panic!("metadata missing statistics");
3285        }
3286    }
3287
3288    #[test]
3289    fn test_column_offset_index_truncating_spec_example() {
3290        // write data
3291        // and check the offset index and column index
3292        let page_writer = get_test_page_writer();
3293
3294        // Truncate values at 1 byte
3295        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3296        let props = Arc::new(builder.build());
3297        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3298
3299        let mut data = vec![FixedLenByteArray::default(); 1];
3300        // This is the expected min value
3301        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3302
3303        writer.write_batch(&data, None, None).unwrap();
3304
3305        writer.flush_data_pages().unwrap();
3306
3307        let r = writer.close().unwrap();
3308        let column_index = r.column_index.unwrap();
3309        let offset_index = r.offset_index.unwrap();
3310
3311        let column_index = match column_index {
3312            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3313            _ => panic!("wrong stats type"),
3314        };
3315
3316        assert_eq!(1, r.rows_written);
3317
3318        // column index
3319        assert_eq!(1, column_index.num_pages());
3320        assert_eq!(1, offset_index.page_locations.len());
3321        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3322        assert!(!column_index.is_null_page(0));
3323        assert_eq!(Some(0), column_index.null_count(0));
3324
3325        if let Some(stats) = r.metadata.statistics() {
3326            assert_eq!(stats.null_count_opt(), Some(0));
3327            assert_eq!(stats.distinct_count_opt(), None);
3328            if let Statistics::FixedLenByteArray(_stats) = stats {
3329                let column_index_min_value = column_index.min_value(0).unwrap();
3330                let column_index_max_value = column_index.max_value(0).unwrap();
3331
3332                assert_eq!(column_index_min_value.len(), 1);
3333                assert_eq!(column_index_max_value.len(), 1);
3334
3335                assert_eq!("B".as_bytes(), column_index_min_value);
3336                assert_eq!("C".as_bytes(), column_index_max_value);
3337
3338                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3339                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3340            } else {
3341                panic!("expecting Statistics::FixedLenByteArray");
3342            }
3343        } else {
3344            panic!("metadata missing statistics");
3345        }
3346    }
3347
3348    #[test]
3349    fn test_float16_min_max_no_truncation() {
3350        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3351        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3352        let props = Arc::new(builder.build());
3353        let page_writer = get_test_page_writer();
3354        let mut writer = get_test_float16_column_writer(page_writer, props);
3355
3356        let expected_value = f16::PI.to_le_bytes().to_vec();
3357        let data = vec![ByteArray::from(expected_value.clone()).into()];
3358        writer.write_batch(&data, None, None).unwrap();
3359        writer.flush_data_pages().unwrap();
3360
3361        let r = writer.close().unwrap();
3362
3363        // stats should still be written
3364        // ensure bytes weren't truncated for column index
3365        let column_index = r.column_index.unwrap();
3366        let column_index = match column_index {
3367            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3368            _ => panic!("wrong stats type"),
3369        };
3370        let column_index_min_bytes = column_index.min_value(0).unwrap();
3371        let column_index_max_bytes = column_index.max_value(0).unwrap();
3372        assert_eq!(expected_value, column_index_min_bytes);
3373        assert_eq!(expected_value, column_index_max_bytes);
3374
3375        // ensure bytes weren't truncated for statistics
3376        let stats = r.metadata.statistics().unwrap();
3377        if let Statistics::FixedLenByteArray(stats) = stats {
3378            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3379            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3380            assert_eq!(expected_value, stats_min_bytes);
3381            assert_eq!(expected_value, stats_max_bytes);
3382        } else {
3383            panic!("expecting Statistics::FixedLenByteArray");
3384        }
3385    }
3386
3387    #[test]
3388    fn test_decimal_min_max_no_truncation() {
3389        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3390        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3391        let props = Arc::new(builder.build());
3392        let page_writer = get_test_page_writer();
3393        let mut writer =
3394            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3395
3396        let expected_value = vec![
3397            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3398            231u8, 90u8, 0u8, 0u8,
3399        ];
3400        let data = vec![ByteArray::from(expected_value.clone()).into()];
3401        writer.write_batch(&data, None, None).unwrap();
3402        writer.flush_data_pages().unwrap();
3403
3404        let r = writer.close().unwrap();
3405
3406        // stats should still be written
3407        // ensure bytes weren't truncated for column index
3408        let column_index = r.column_index.unwrap();
3409        let column_index = match column_index {
3410            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3411            _ => panic!("wrong stats type"),
3412        };
3413        let column_index_min_bytes = column_index.min_value(0).unwrap();
3414        let column_index_max_bytes = column_index.max_value(0).unwrap();
3415        assert_eq!(expected_value, column_index_min_bytes);
3416        assert_eq!(expected_value, column_index_max_bytes);
3417
3418        // ensure bytes weren't truncated for statistics
3419        let stats = r.metadata.statistics().unwrap();
3420        if let Statistics::FixedLenByteArray(stats) = stats {
3421            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3422            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3423            assert_eq!(expected_value, stats_min_bytes);
3424            assert_eq!(expected_value, stats_max_bytes);
3425        } else {
3426            panic!("expecting Statistics::FixedLenByteArray");
3427        }
3428    }
3429
3430    #[test]
3431    fn test_statistics_truncating_byte_array_default() {
3432        let page_writer = get_test_page_writer();
3433
3434        // The default truncate length is 64 bytes
3435        let props = WriterProperties::builder().build().into();
3436        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3437
3438        let mut data = vec![ByteArray::default(); 1];
3439        data[0].set_data(Bytes::from(String::from(
3440            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3441        )));
3442        writer.write_batch(&data, None, None).unwrap();
3443        writer.flush_data_pages().unwrap();
3444
3445        let r = writer.close().unwrap();
3446
3447        assert_eq!(1, r.rows_written);
3448
3449        let stats = r.metadata.statistics().expect("statistics");
3450        if let Statistics::ByteArray(_stats) = stats {
3451            let min_value = _stats.min_opt().unwrap();
3452            let max_value = _stats.max_opt().unwrap();
3453
3454            assert!(!_stats.min_is_exact());
3455            assert!(!_stats.max_is_exact());
3456
3457            let expected_len = 64;
3458            assert_eq!(min_value.len(), expected_len);
3459            assert_eq!(max_value.len(), expected_len);
3460
3461            let expected_min =
3462                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3463            assert_eq!(expected_min, min_value.as_bytes());
3464            // note the max value is different from the min value: the last byte is incremented
3465            let expected_max =
3466                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3467            assert_eq!(expected_max, max_value.as_bytes());
3468        } else {
3469            panic!("expecting Statistics::ByteArray");
3470        }
3471    }
3472
3473    #[test]
3474    fn test_statistics_truncating_byte_array() {
3475        let page_writer = get_test_page_writer();
3476
3477        const TEST_TRUNCATE_LENGTH: usize = 1;
3478
3479        // Truncate values at 1 byte
3480        let builder =
3481            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3482        let props = Arc::new(builder.build());
3483        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3484
3485        let mut data = vec![ByteArray::default(); 1];
3486        // This is the expected min value
3487        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3488
3489        writer.write_batch(&data, None, None).unwrap();
3490
3491        writer.flush_data_pages().unwrap();
3492
3493        let r = writer.close().unwrap();
3494
3495        assert_eq!(1, r.rows_written);
3496
3497        let stats = r.metadata.statistics().expect("statistics");
3498        assert_eq!(stats.null_count_opt(), Some(0));
3499        assert_eq!(stats.distinct_count_opt(), None);
3500        if let Statistics::ByteArray(_stats) = stats {
3501            let min_value = _stats.min_opt().unwrap();
3502            let max_value = _stats.max_opt().unwrap();
3503
3504            assert!(!_stats.min_is_exact());
3505            assert!(!_stats.max_is_exact());
3506
3507            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3508            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3509
3510            assert_eq!("B".as_bytes(), min_value.as_bytes());
3511            assert_eq!("C".as_bytes(), max_value.as_bytes());
3512        } else {
3513            panic!("expecting Statistics::ByteArray");
3514        }
3515    }
3516
3517    #[test]
3518    fn test_statistics_truncating_fixed_len_byte_array() {
3519        let page_writer = get_test_page_writer();
3520
3521        const TEST_TRUNCATE_LENGTH: usize = 1;
3522
3523        // Truncate values at 1 byte
3524        let builder =
3525            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3526        let props = Arc::new(builder.build());
3527        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3528
3529        let mut data = vec![FixedLenByteArray::default(); 1];
3530
3531        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3532        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3533
3534        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3535        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3536            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3537
3538        // This is the expected min value
3539        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3540
3541        writer.write_batch(&data, None, None).unwrap();
3542
3543        writer.flush_data_pages().unwrap();
3544
3545        let r = writer.close().unwrap();
3546
3547        assert_eq!(1, r.rows_written);
3548
3549        let stats = r.metadata.statistics().expect("statistics");
3550        assert_eq!(stats.null_count_opt(), Some(0));
3551        assert_eq!(stats.distinct_count_opt(), None);
3552        if let Statistics::FixedLenByteArray(_stats) = stats {
3553            let min_value = _stats.min_opt().unwrap();
3554            let max_value = _stats.max_opt().unwrap();
3555
3556            assert!(!_stats.min_is_exact());
3557            assert!(!_stats.max_is_exact());
3558
3559            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3560            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3561
3562            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3563            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3564
3565            let reconstructed_min = i128::from_be_bytes([
3566                min_value.as_bytes()[0],
3567                0,
3568                0,
3569                0,
3570                0,
3571                0,
3572                0,
3573                0,
3574                0,
3575                0,
3576                0,
3577                0,
3578                0,
3579                0,
3580                0,
3581                0,
3582            ]);
3583
3584            let reconstructed_max = i128::from_be_bytes([
3585                max_value.as_bytes()[0],
3586                0,
3587                0,
3588                0,
3589                0,
3590                0,
3591                0,
3592                0,
3593                0,
3594                0,
3595                0,
3596                0,
3597                0,
3598                0,
3599                0,
3600                0,
3601            ]);
3602
3603            // check that the inner value is correctly bounded by the min/max
3604            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3605            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3606            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3607            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3608        } else {
3609            panic!("expecting Statistics::FixedLenByteArray");
3610        }
3611    }
3612
3613    #[test]
3614    fn test_send() {
3615        fn test<T: Send>() {}
3616        test::<ColumnWriterImpl<Int32Type>>();
3617    }
3618
3619    #[test]
3620    fn test_increment() {
3621        let v = increment(vec![0, 0, 0]).unwrap();
3622        assert_eq!(&v, &[0, 0, 1]);
3623
3624        // Handle overflow
3625        let v = increment(vec![0, 255, 255]).unwrap();
3626        assert_eq!(&v, &[1, 0, 0]);
3627
3628        // Return `None` if all bytes are u8::MAX
3629        let v = increment(vec![255, 255, 255]);
3630        assert!(v.is_none());
3631    }
3632
3633    #[test]
3634    fn test_increment_utf8() {
3635        let test_inc = |o: &str, expected: &str| {
3636            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3637                // Got the expected result...
3638                assert_eq!(v, expected);
3639                // and it's greater than the original string
3640                assert!(*v > *o);
3641                // Also show that BinaryArray level comparison works here
3642                let mut greater = ByteArray::new();
3643                greater.set_data(Bytes::from(v));
3644                let mut original = ByteArray::new();
3645                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3646                assert!(greater > original);
3647            } else {
3648                panic!("Expected incremented UTF8 string to also be valid.");
3649            }
3650        };
3651
3652        // Basic ASCII case
3653        test_inc("hello", "hellp");
3654
3655        // 1-byte ending in max 1-byte
3656        test_inc("a\u{7f}", "b");
3657
3658        // 1-byte max should not truncate as it would need 2-byte code points
3659        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3660
3661        // UTF8 string
3662        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3663
3664        // 2-byte without overflow
3665        test_inc("éééé", "éééê");
3666
3667        // 2-byte that overflows lowest byte
3668        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3669
3670        // 2-byte ending in max 2-byte
3671        test_inc("a\u{7ff}", "b");
3672
3673        // Max 2-byte should not truncate as it would need 3-byte code points
3674        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3675
3676        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3677        // characters should render right to left).
3678        test_inc("ࠀࠀ", "ࠀࠁ");
3679
3680        // 3-byte ending in max 3-byte
3681        test_inc("a\u{ffff}", "b");
3682
3683        // Max 3-byte should not truncate as it would need 4-byte code points
3684        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3685
3686        // 4-byte without overflow
3687        test_inc("𐀀𐀀", "𐀀𐀁");
3688
3689        // 4-byte ending in max unicode
3690        test_inc("a\u{10ffff}", "b");
3691
3692        // Max 4-byte should not truncate
3693        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3694
3695        // Skip over surrogate pair range (0xD800..=0xDFFF)
3696        //test_inc("a\u{D7FF}", "a\u{e000}");
3697        test_inc("a\u{D7FF}", "b");
3698    }
3699
3700    #[test]
3701    fn test_truncate_utf8() {
3702        // No-op
3703        let data = "❤️🧡💛💚💙💜";
3704        let r = truncate_utf8(data, data.len()).unwrap();
3705        assert_eq!(r.len(), data.len());
3706        assert_eq!(&r, data.as_bytes());
3707
3708        // We slice it away from the UTF8 boundary
3709        let r = truncate_utf8(data, 13).unwrap();
3710        assert_eq!(r.len(), 10);
3711        assert_eq!(&r, "❤️🧡".as_bytes());
3712
3713        // One multi-byte code point, and a length shorter than it, so we can't slice it
3714        let r = truncate_utf8("\u{0836}", 1);
3715        assert!(r.is_none());
3716
3717        // Test truncate and increment for max bounds on UTF-8 statistics
3718        // 7-bit (i.e. ASCII)
3719        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3720        assert_eq!(&r, "yyyyyyyz".as_bytes());
3721
3722        // 2-byte without overflow
3723        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3724        assert_eq!(&r, "ééê".as_bytes());
3725
3726        // 2-byte that overflows lowest byte
3727        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3728        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3729
3730        // max 2-byte should not truncate as it would need 3-byte code points
3731        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3732        assert!(r.is_none());
3733
3734        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3735        // characters should render right to left).
3736        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3737        assert_eq!(&r, "ࠀࠁ".as_bytes());
3738
3739        // max 3-byte should not truncate as it would need 4-byte code points
3740        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3741        assert!(r.is_none());
3742
3743        // 4-byte without overflow
3744        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3745        assert_eq!(&r, "𐀀𐀁".as_bytes());
3746
3747        // max 4-byte should not truncate
3748        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3749        assert!(r.is_none());
3750    }
3751
3752    #[test]
3753    // Check fallback truncation of statistics that should be UTF-8, but aren't
3754    // (see https://github.com/apache/arrow-rs/pull/6870).
3755    fn test_byte_array_truncate_invalid_utf8_statistics() {
3756        let message_type = "
3757            message test_schema {
3758                OPTIONAL BYTE_ARRAY a (UTF8);
3759            }
3760        ";
3761        let schema = Arc::new(parse_message_type(message_type).unwrap());
3762
3763        // Create Vec<ByteArray> containing non-UTF8 bytes
3764        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3765        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3766        let file: File = tempfile::tempfile().unwrap();
3767        let props = Arc::new(
3768            WriterProperties::builder()
3769                .set_statistics_enabled(EnabledStatistics::Chunk)
3770                .set_statistics_truncate_length(Some(8))
3771                .build(),
3772        );
3773
3774        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3775        let mut row_group_writer = writer.next_row_group().unwrap();
3776
3777        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3778        col_writer
3779            .typed::<ByteArrayType>()
3780            .write_batch(&data, Some(&def_levels), None)
3781            .unwrap();
3782        col_writer.close().unwrap();
3783        row_group_writer.close().unwrap();
3784        let file_metadata = writer.close().unwrap();
3785        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3786        assert!(!stats.max_is_exact());
3787        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3788        // be incremented by 1.
3789        assert_eq!(
3790            stats.max_bytes_opt().map(|v| v.to_vec()),
3791            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3792        );
3793    }
3794
3795    #[test]
3796    fn test_increment_max_binary_chars() {
3797        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3798        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3799
3800        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3801        assert!(incremented.is_none())
3802    }
3803
3804    #[test]
3805    fn test_no_column_index_when_stats_disabled() {
3806        // https://github.com/apache/arrow-rs/issues/6010
3807        // Test that column index is not created/written for all-nulls column when page
3808        // statistics are disabled.
3809        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3810        let props = Arc::new(
3811            WriterProperties::builder()
3812                .set_statistics_enabled(EnabledStatistics::None)
3813                .build(),
3814        );
3815        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3816        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3817
3818        let data = Vec::new();
3819        let def_levels = vec![0; 10];
3820        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3821        writer.flush_data_pages().unwrap();
3822
3823        let column_close_result = writer.close().unwrap();
3824        assert!(column_close_result.offset_index.is_some());
3825        assert!(column_close_result.column_index.is_none());
3826    }
3827
3828    #[test]
3829    fn test_no_offset_index_when_disabled() {
3830        // Test that offset indexes can be disabled
3831        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3832        let props = Arc::new(
3833            WriterProperties::builder()
3834                .set_statistics_enabled(EnabledStatistics::None)
3835                .set_offset_index_disabled(true)
3836                .build(),
3837        );
3838        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3839        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3840
3841        let data = Vec::new();
3842        let def_levels = vec![0; 10];
3843        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3844        writer.flush_data_pages().unwrap();
3845
3846        let column_close_result = writer.close().unwrap();
3847        assert!(column_close_result.offset_index.is_none());
3848        assert!(column_close_result.column_index.is_none());
3849    }
3850
3851    #[test]
3852    fn test_offset_index_overridden() {
3853        // Test that offset indexes are not disabled when gathering page statistics
3854        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3855        let props = Arc::new(
3856            WriterProperties::builder()
3857                .set_statistics_enabled(EnabledStatistics::Page)
3858                .set_offset_index_disabled(true)
3859                .build(),
3860        );
3861        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3862        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3863
3864        let data = Vec::new();
3865        let def_levels = vec![0; 10];
3866        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3867        writer.flush_data_pages().unwrap();
3868
3869        let column_close_result = writer.close().unwrap();
3870        assert!(column_close_result.offset_index.is_some());
3871        assert!(column_close_result.column_index.is_some());
3872    }
3873
3874    #[test]
3875    fn test_boundary_order() -> Result<()> {
3876        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3877        // min max both ascending
3878        let column_close_result = write_multiple_pages::<Int32Type>(
3879            &descr,
3880            &[
3881                &[Some(-10), Some(10)],
3882                &[Some(-5), Some(11)],
3883                &[None],
3884                &[Some(-5), Some(11)],
3885            ],
3886        )?;
3887        let boundary_order = column_close_result
3888            .column_index
3889            .unwrap()
3890            .get_boundary_order();
3891        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3892
3893        // min max both descending
3894        let column_close_result = write_multiple_pages::<Int32Type>(
3895            &descr,
3896            &[
3897                &[Some(10), Some(11)],
3898                &[Some(5), Some(11)],
3899                &[None],
3900                &[Some(-5), Some(0)],
3901            ],
3902        )?;
3903        let boundary_order = column_close_result
3904            .column_index
3905            .unwrap()
3906            .get_boundary_order();
3907        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3908
3909        // min max both equal
3910        let column_close_result = write_multiple_pages::<Int32Type>(
3911            &descr,
3912            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3913        )?;
3914        let boundary_order = column_close_result
3915            .column_index
3916            .unwrap()
3917            .get_boundary_order();
3918        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3919
3920        // only nulls
3921        let column_close_result =
3922            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3923        let boundary_order = column_close_result
3924            .column_index
3925            .unwrap()
3926            .get_boundary_order();
3927        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3928
3929        // one page
3930        let column_close_result =
3931            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3932        let boundary_order = column_close_result
3933            .column_index
3934            .unwrap()
3935            .get_boundary_order();
3936        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3937
3938        // one non-null page
3939        let column_close_result =
3940            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3941        let boundary_order = column_close_result
3942            .column_index
3943            .unwrap()
3944            .get_boundary_order();
3945        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3946
3947        // min max both unordered
3948        let column_close_result = write_multiple_pages::<Int32Type>(
3949            &descr,
3950            &[
3951                &[Some(10), Some(11)],
3952                &[Some(11), Some(16)],
3953                &[None],
3954                &[Some(-5), Some(0)],
3955            ],
3956        )?;
3957        let boundary_order = column_close_result
3958            .column_index
3959            .unwrap()
3960            .get_boundary_order();
3961        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3962
3963        // min max both ordered in different orders
3964        let column_close_result = write_multiple_pages::<Int32Type>(
3965            &descr,
3966            &[
3967                &[Some(1), Some(9)],
3968                &[Some(2), Some(8)],
3969                &[None],
3970                &[Some(3), Some(7)],
3971            ],
3972        )?;
3973        let boundary_order = column_close_result
3974            .column_index
3975            .unwrap()
3976            .get_boundary_order();
3977        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3978
3979        Ok(())
3980    }
3981
3982    #[test]
3983    fn test_boundary_order_logical_type() -> Result<()> {
3984        // ensure that logical types account for different sort order than underlying
3985        // physical type representation
3986        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3987        let fba_descr = {
3988            let tpe = SchemaType::primitive_type_builder(
3989                "col",
3990                FixedLenByteArrayType::get_physical_type(),
3991            )
3992            .with_length(2)
3993            .build()?;
3994            Arc::new(ColumnDescriptor::new(
3995                Arc::new(tpe),
3996                1,
3997                0,
3998                ColumnPath::from("col"),
3999            ))
4000        };
4001
4002        let values: &[&[Option<FixedLenByteArray>]] = &[
4003            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
4004            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
4005            &[Some(FixedLenByteArray::from(ByteArray::from(
4006                f16::NEG_ZERO,
4007            )))],
4008            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
4009        ];
4010
4011        // f16 descending
4012        let column_close_result =
4013            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
4014        let boundary_order = column_close_result
4015            .column_index
4016            .unwrap()
4017            .get_boundary_order();
4018        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
4019
4020        // same bytes, but fba unordered
4021        let column_close_result =
4022            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
4023        let boundary_order = column_close_result
4024            .column_index
4025            .unwrap()
4026            .get_boundary_order();
4027        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4028
4029        Ok(())
4030    }
4031
4032    #[test]
4033    fn test_interval_stats_should_not_have_min_max() {
4034        let input = [
4035            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
4036            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
4037            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
4038        ]
4039        .into_iter()
4040        .map(|s| ByteArray::from(s).into())
4041        .collect::<Vec<_>>();
4042
4043        let page_writer = get_test_page_writer();
4044        let mut writer = get_test_interval_column_writer(page_writer);
4045        writer.write_batch(&input, None, None).unwrap();
4046
4047        let metadata = writer.close().unwrap().metadata;
4048        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4049            stats.clone()
4050        } else {
4051            panic!("metadata missing statistics");
4052        };
4053        assert!(stats.min_bytes_opt().is_none());
4054        assert!(stats.max_bytes_opt().is_none());
4055    }
4056
4057    #[test]
4058    #[cfg(feature = "arrow")]
4059    fn test_column_writer_get_estimated_total_bytes() {
4060        let page_writer = get_test_page_writer();
4061        let props = Default::default();
4062        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
4063        assert_eq!(writer.get_estimated_total_bytes(), 0);
4064
4065        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
4066        writer.add_data_page().unwrap();
4067        let size_with_one_page = writer.get_estimated_total_bytes();
4068        assert_eq!(size_with_one_page, 20);
4069
4070        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
4071        writer.add_data_page().unwrap();
4072        let size_with_two_pages = writer.get_estimated_total_bytes();
4073        // different pages have different compressed lengths
4074        assert_eq!(size_with_two_pages, 20 + 21);
4075    }
4076
4077    fn write_multiple_pages<T: DataType>(
4078        column_descr: &Arc<ColumnDescriptor>,
4079        pages: &[&[Option<T::T>]],
4080    ) -> Result<ColumnCloseResult> {
4081        let column_writer = get_column_writer(
4082            column_descr.clone(),
4083            Default::default(),
4084            get_test_page_writer(),
4085        );
4086        let mut writer = get_typed_column_writer::<T>(column_writer);
4087
4088        for &page in pages {
4089            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
4090            let def_levels = page
4091                .iter()
4092                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
4093                .collect::<Vec<_>>();
4094            writer.write_batch(&values, Some(&def_levels), None)?;
4095            writer.flush_data_pages()?;
4096        }
4097
4098        writer.close()
4099    }
4100
4101    /// Performs write-read roundtrip with randomly generated values and levels.
4102    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
4103    /// for a column.
4104    fn column_roundtrip_random<T: DataType>(
4105        props: WriterProperties,
4106        max_size: usize,
4107        min_value: T::T,
4108        max_value: T::T,
4109        max_def_level: i16,
4110        max_rep_level: i16,
4111    ) where
4112        T::T: PartialOrd + SampleUniform + Copy,
4113    {
4114        let mut num_values: usize = 0;
4115
4116        let mut buf: Vec<i16> = Vec::new();
4117        let def_levels = if max_def_level > 0 {
4118            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
4119            for &dl in &buf[..] {
4120                if dl == max_def_level {
4121                    num_values += 1;
4122                }
4123            }
4124            Some(&buf[..])
4125        } else {
4126            num_values = max_size;
4127            None
4128        };
4129
4130        let mut buf: Vec<i16> = Vec::new();
4131        let rep_levels = if max_rep_level > 0 {
4132            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
4133            buf[0] = 0; // Must start on record boundary
4134            Some(&buf[..])
4135        } else {
4136            None
4137        };
4138
4139        let mut values: Vec<T::T> = Vec::new();
4140        random_numbers_range(num_values, min_value, max_value, &mut values);
4141
4142        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
4143    }
4144
4145    /// Performs write-read roundtrip and asserts written values and levels.
4146    fn column_roundtrip<T: DataType>(
4147        props: WriterProperties,
4148        values: &[T::T],
4149        def_levels: Option<&[i16]>,
4150        rep_levels: Option<&[i16]>,
4151    ) {
4152        let mut file = tempfile::tempfile().unwrap();
4153        let mut write = TrackedWrite::new(&mut file);
4154        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4155
4156        let max_def_level = match def_levels {
4157            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4158            None => 0i16,
4159        };
4160
4161        let max_rep_level = match rep_levels {
4162            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4163            None => 0i16,
4164        };
4165
4166        let mut max_batch_size = values.len();
4167        if let Some(levels) = def_levels {
4168            max_batch_size = max_batch_size.max(levels.len());
4169        }
4170        if let Some(levels) = rep_levels {
4171            max_batch_size = max_batch_size.max(levels.len());
4172        }
4173
4174        let mut writer =
4175            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4176
4177        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4178        assert_eq!(values_written, values.len());
4179        let result = writer.close().unwrap();
4180
4181        drop(write);
4182
4183        let props = ReaderProperties::builder()
4184            .set_backward_compatible_lz4(false)
4185            .build();
4186        let page_reader = Box::new(
4187            SerializedPageReader::new_with_properties(
4188                Arc::new(file),
4189                &result.metadata,
4190                result.rows_written as usize,
4191                None,
4192                Arc::new(props),
4193            )
4194            .unwrap(),
4195        );
4196        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4197
4198        let mut actual_values = Vec::with_capacity(max_batch_size);
4199        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4200        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4201
4202        let (_, values_read, levels_read) = reader
4203            .read_records(
4204                max_batch_size,
4205                actual_def_levels.as_mut(),
4206                actual_rep_levels.as_mut(),
4207                &mut actual_values,
4208            )
4209            .unwrap();
4210
4211        // Assert values, definition and repetition levels.
4212
4213        assert_eq!(&actual_values[..values_read], values);
4214        match actual_def_levels {
4215            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4216            None => assert_eq!(None, def_levels),
4217        }
4218        match actual_rep_levels {
4219            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4220            None => assert_eq!(None, rep_levels),
4221        }
4222
4223        // Assert written rows.
4224
4225        if let Some(levels) = actual_rep_levels {
4226            let mut actual_rows_written = 0;
4227            for l in levels {
4228                if l == 0 {
4229                    actual_rows_written += 1;
4230                }
4231            }
4232            assert_eq!(actual_rows_written, result.rows_written);
4233        } else if actual_def_levels.is_some() {
4234            assert_eq!(levels_read as u64, result.rows_written);
4235        } else {
4236            assert_eq!(values_read as u64, result.rows_written);
4237        }
4238    }
4239
4240    /// Performs write of provided values and returns column metadata of those values.
4241    /// Used to test encoding support for column writer.
4242    fn column_write_and_get_metadata<T: DataType>(
4243        props: WriterProperties,
4244        values: &[T::T],
4245    ) -> ColumnChunkMetaData {
4246        let page_writer = get_test_page_writer();
4247        let props = Arc::new(props);
4248        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4249        writer.write_batch(values, None, None).unwrap();
4250        writer.close().unwrap().metadata
4251    }
4252
4253    // Helper function to more compactly create a PageEncodingStats struct.
4254    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4255        PageEncodingStats {
4256            page_type,
4257            encoding,
4258            count,
4259        }
4260    }
4261
4262    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4263    // offset and encodings to make sure that column writer uses provided by trait
4264    // encodings.
4265    fn check_encoding_write_support<T: DataType>(
4266        version: WriterVersion,
4267        dict_enabled: bool,
4268        data: &[T::T],
4269        dictionary_page_offset: Option<i64>,
4270        encodings: &[Encoding],
4271        page_encoding_stats: &[PageEncodingStats],
4272    ) {
4273        let props = WriterProperties::builder()
4274            .set_writer_version(version)
4275            .set_dictionary_enabled(dict_enabled)
4276            .build();
4277        let meta = column_write_and_get_metadata::<T>(props, data);
4278        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4279        assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4280        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4281    }
4282
4283    /// Returns column writer.
4284    fn get_test_column_writer<'a, T: DataType>(
4285        page_writer: Box<dyn PageWriter + 'a>,
4286        max_def_level: i16,
4287        max_rep_level: i16,
4288        props: WriterPropertiesPtr,
4289    ) -> ColumnWriterImpl<'a, T> {
4290        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4291        let column_writer = get_column_writer(descr, props, page_writer);
4292        get_typed_column_writer::<T>(column_writer)
4293    }
4294
4295    fn get_test_column_writer_with_path<'a, T: DataType>(
4296        page_writer: Box<dyn PageWriter + 'a>,
4297        max_def_level: i16,
4298        max_rep_level: i16,
4299        props: WriterPropertiesPtr,
4300        path: ColumnPath,
4301    ) -> ColumnWriterImpl<'a, T> {
4302        let descr = Arc::new(get_test_column_descr_with_path::<T>(
4303            max_def_level,
4304            max_rep_level,
4305            path,
4306        ));
4307        let column_writer = get_column_writer(descr, props, page_writer);
4308        get_typed_column_writer::<T>(column_writer)
4309    }
4310
4311    /// Returns column reader.
4312    fn get_test_column_reader<T: DataType>(
4313        page_reader: Box<dyn PageReader>,
4314        max_def_level: i16,
4315        max_rep_level: i16,
4316    ) -> ColumnReaderImpl<T> {
4317        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4318        let column_reader = get_column_reader(descr, page_reader);
4319        get_typed_column_reader::<T>(column_reader)
4320    }
4321
4322    /// Returns descriptor for primitive column.
4323    fn get_test_column_descr<T: DataType>(
4324        max_def_level: i16,
4325        max_rep_level: i16,
4326    ) -> ColumnDescriptor {
4327        let path = ColumnPath::from("col");
4328        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4329            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4330            // it should be no-op for other types
4331            .with_length(1)
4332            .build()
4333            .unwrap();
4334        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4335    }
4336
4337    fn get_test_column_descr_with_path<T: DataType>(
4338        max_def_level: i16,
4339        max_rep_level: i16,
4340        path: ColumnPath,
4341    ) -> ColumnDescriptor {
4342        let name = path.string();
4343        let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4344            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4345            // it should be no-op for other types
4346            .with_length(1)
4347            .build()
4348            .unwrap();
4349        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4350    }
4351
4352    fn write_and_collect_page_values(
4353        path: ColumnPath,
4354        props: WriterPropertiesPtr,
4355        data: &[i32],
4356    ) -> Vec<u32> {
4357        let mut file = tempfile::tempfile().unwrap();
4358        let mut write = TrackedWrite::new(&mut file);
4359        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4360        let mut writer =
4361            get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4362        writer.write_batch(data, None, None).unwrap();
4363        let r = writer.close().unwrap();
4364
4365        drop(write);
4366
4367        let props = ReaderProperties::builder()
4368            .set_backward_compatible_lz4(false)
4369            .build();
4370        let mut page_reader = Box::new(
4371            SerializedPageReader::new_with_properties(
4372                Arc::new(file),
4373                &r.metadata,
4374                r.rows_written as usize,
4375                None,
4376                Arc::new(props),
4377            )
4378            .unwrap(),
4379        );
4380
4381        let mut values_per_page = Vec::new();
4382        while let Some(page) = page_reader.get_next_page().unwrap() {
4383            assert_eq!(page.page_type(), PageType::DATA_PAGE);
4384            values_per_page.push(page.num_values());
4385        }
4386
4387        values_per_page
4388    }
4389
4390    /// Returns page writer that collects pages without serializing them.
4391    fn get_test_page_writer() -> Box<dyn PageWriter> {
4392        Box::new(TestPageWriter {})
4393    }
4394
4395    struct TestPageWriter {}
4396
4397    impl PageWriter for TestPageWriter {
4398        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4399            let mut res = PageWriteSpec::new();
4400            res.page_type = page.page_type();
4401            res.uncompressed_size = page.uncompressed_size();
4402            res.compressed_size = page.compressed_size();
4403            res.num_values = page.num_values();
4404            res.offset = 0;
4405            res.bytes_written = page.data().len() as u64;
4406            Ok(res)
4407        }
4408
4409        fn close(&mut self) -> Result<()> {
4410            Ok(())
4411        }
4412    }
4413
4414    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4415    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4416        let page_writer = get_test_page_writer();
4417        let props = Default::default();
4418        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4419        writer.write_batch(values, None, None).unwrap();
4420
4421        let metadata = writer.close().unwrap().metadata;
4422        if let Some(stats) = metadata.statistics() {
4423            stats.clone()
4424        } else {
4425            panic!("metadata missing statistics");
4426        }
4427    }
4428
4429    /// Returns Decimals column writer.
4430    fn get_test_decimals_column_writer<T: DataType>(
4431        page_writer: Box<dyn PageWriter>,
4432        max_def_level: i16,
4433        max_rep_level: i16,
4434        props: WriterPropertiesPtr,
4435    ) -> ColumnWriterImpl<'static, T> {
4436        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4437            max_def_level,
4438            max_rep_level,
4439        ));
4440        let column_writer = get_column_writer(descr, props, page_writer);
4441        get_typed_column_writer::<T>(column_writer)
4442    }
4443
4444    /// Returns descriptor for Decimal type with primitive column.
4445    fn get_test_decimals_column_descr<T: DataType>(
4446        max_def_level: i16,
4447        max_rep_level: i16,
4448    ) -> ColumnDescriptor {
4449        let path = ColumnPath::from("col");
4450        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4451            .with_length(16)
4452            .with_logical_type(Some(LogicalType::Decimal {
4453                scale: 2,
4454                precision: 3,
4455            }))
4456            .with_scale(2)
4457            .with_precision(3)
4458            .build()
4459            .unwrap();
4460        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4461    }
4462
4463    fn float16_statistics_roundtrip(
4464        values: &[FixedLenByteArray],
4465    ) -> ValueStatistics<FixedLenByteArray> {
4466        let page_writer = get_test_page_writer();
4467        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4468        writer.write_batch(values, None, None).unwrap();
4469
4470        let metadata = writer.close().unwrap().metadata;
4471        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4472            stats.clone()
4473        } else {
4474            panic!("metadata missing statistics");
4475        }
4476    }
4477
4478    fn get_test_float16_column_writer(
4479        page_writer: Box<dyn PageWriter>,
4480        props: WriterPropertiesPtr,
4481    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4482        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4483        let column_writer = get_column_writer(descr, props, page_writer);
4484        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4485    }
4486
4487    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4488        let path = ColumnPath::from("col");
4489        let tpe =
4490            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4491                .with_length(2)
4492                .with_logical_type(Some(LogicalType::Float16))
4493                .build()
4494                .unwrap();
4495        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4496    }
4497
4498    fn get_test_interval_column_writer(
4499        page_writer: Box<dyn PageWriter>,
4500    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4501        let descr = Arc::new(get_test_interval_column_descr());
4502        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4503        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4504    }
4505
4506    fn get_test_interval_column_descr() -> ColumnDescriptor {
4507        let path = ColumnPath::from("col");
4508        let tpe =
4509            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4510                .with_length(12)
4511                .with_converted_type(ConvertedType::INTERVAL)
4512                .build()
4513                .unwrap();
4514        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4515    }
4516
4517    /// Returns column writer for UINT32 Column provided as ConvertedType only
4518    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4519        page_writer: Box<dyn PageWriter + 'a>,
4520        max_def_level: i16,
4521        max_rep_level: i16,
4522        props: WriterPropertiesPtr,
4523    ) -> ColumnWriterImpl<'a, T> {
4524        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4525            max_def_level,
4526            max_rep_level,
4527        ));
4528        let column_writer = get_column_writer(descr, props, page_writer);
4529        get_typed_column_writer::<T>(column_writer)
4530    }
4531
4532    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4533    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4534        max_def_level: i16,
4535        max_rep_level: i16,
4536    ) -> ColumnDescriptor {
4537        let path = ColumnPath::from("col");
4538        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4539            .with_converted_type(ConvertedType::UINT_32)
4540            .build()
4541            .unwrap();
4542        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4543    }
4544
4545    #[test]
4546    fn test_page_v2_snappy_compression_fallback() {
4547        // Test that PageV2 sets is_compressed to false when Snappy compression increases data size
4548        let page_writer = TestPageWriter {};
4549
4550        // Create WriterProperties with PageV2 and Snappy compression
4551        let props = WriterProperties::builder()
4552            .set_writer_version(WriterVersion::PARQUET_2_0)
4553            // Disable dictionary to ensure data is written directly
4554            .set_dictionary_enabled(false)
4555            .set_compression(Compression::SNAPPY)
4556            .build();
4557
4558        let mut column_writer =
4559            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4560
4561        // Create small, simple data that Snappy compression will likely increase in size
4562        // due to compression overhead for very small data
4563        let values = vec![ByteArray::from("a")];
4564
4565        column_writer.write_batch(&values, None, None).unwrap();
4566
4567        let result = column_writer.close().unwrap();
4568        assert_eq!(
4569            result.metadata.uncompressed_size(),
4570            result.metadata.compressed_size()
4571        );
4572    }
4573
4574    struct ColumnRoundTripUniform<'a, T: DataType> {
4575        props: WriterProperties,
4576        values: &'a [T::T],
4577        def_levels: LevelDataRef<'a>,
4578        rep_levels: LevelDataRef<'a>,
4579        max_def_level: i16,
4580        max_rep_level: i16,
4581        expected_values: &'a [T::T],
4582        expected_def_levels: Option<&'a [i16]>,
4583        expected_rep_levels: Option<&'a [i16]>,
4584    }
4585
4586    impl<'a, T: DataType> ColumnRoundTripUniform<'a, T>
4587    where
4588        T::T: PartialEq + std::fmt::Debug,
4589    {
4590        fn new() -> Self {
4591            Self {
4592                props: Default::default(),
4593                values: &[],
4594                def_levels: LevelDataRef::Absent,
4595                rep_levels: LevelDataRef::Absent,
4596                max_def_level: 0,
4597                max_rep_level: 0,
4598                expected_values: &[],
4599                expected_def_levels: None,
4600                expected_rep_levels: None,
4601            }
4602        }
4603
4604        fn with_props(mut self, props: WriterProperties) -> Self {
4605            self.props = props;
4606            self
4607        }
4608
4609        fn with_values(mut self, values: &'a [T::T]) -> Self {
4610            self.values = values;
4611            self
4612        }
4613
4614        fn with_def_levels(mut self, def_levels: LevelDataRef<'a>) -> Self {
4615            self.def_levels = def_levels;
4616            self
4617        }
4618
4619        fn with_rep_levels(mut self, rep_levels: LevelDataRef<'a>) -> Self {
4620            self.rep_levels = rep_levels;
4621            self
4622        }
4623
4624        fn with_max_def_level(mut self, max_def_level: i16) -> Self {
4625            self.max_def_level = max_def_level;
4626            self
4627        }
4628
4629        fn with_max_rep_level(mut self, max_rep_level: i16) -> Self {
4630            self.max_rep_level = max_rep_level;
4631            self
4632        }
4633
4634        fn with_expected_values(mut self, expected_values: &'a [T::T]) -> Self {
4635            self.expected_values = expected_values;
4636            self
4637        }
4638
4639        fn with_expected_def_levels(mut self, expected_def_levels: &'a [i16]) -> Self {
4640            self.expected_def_levels = Some(expected_def_levels);
4641            self
4642        }
4643
4644        fn with_expected_rep_levels(mut self, expected_rep_levels: &'a [i16]) -> Self {
4645            self.expected_rep_levels = Some(expected_rep_levels);
4646            self
4647        }
4648
4649        /// Write-then-read roundtrip using `write_batch_internal` with the given
4650        /// [`LevelDataRef`] variants, and assert the read-back matches `expected_*`.
4651        fn run(self) {
4652            let mut file = tempfile::tempfile().unwrap();
4653            let mut write = TrackedWrite::new(&mut file);
4654            let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4655            let mut writer = get_test_column_writer::<T>(
4656                page_writer,
4657                self.max_def_level,
4658                self.max_rep_level,
4659                Arc::new(self.props),
4660            );
4661
4662            writer
4663                .write_batch_internal(
4664                    self.values,
4665                    None,
4666                    self.def_levels,
4667                    self.rep_levels,
4668                    None,
4669                    None,
4670                    None,
4671                )
4672                .unwrap();
4673            let result = writer.close().unwrap();
4674            drop(write);
4675
4676            let props = ReaderProperties::builder()
4677                .set_backward_compatible_lz4(false)
4678                .build();
4679            let page_reader = Box::new(
4680                SerializedPageReader::new_with_properties(
4681                    Arc::new(file),
4682                    &result.metadata,
4683                    result.rows_written as usize,
4684                    None,
4685                    Arc::new(props),
4686                )
4687                .unwrap(),
4688            );
4689            let mut reader =
4690                get_test_column_reader::<T>(page_reader, self.max_def_level, self.max_rep_level);
4691
4692            let batch_size = self
4693                .expected_def_levels
4694                .map_or(self.expected_values.len(), |l| l.len());
4695            let mut actual_values = Vec::with_capacity(batch_size);
4696            let mut actual_def = self
4697                .expected_def_levels
4698                .map(|_| Vec::with_capacity(batch_size));
4699            let mut actual_rep = self
4700                .expected_rep_levels
4701                .map(|_| Vec::with_capacity(batch_size));
4702
4703            let (_, values_read, levels_read) = reader
4704                .read_records(
4705                    batch_size,
4706                    actual_def.as_mut(),
4707                    actual_rep.as_mut(),
4708                    &mut actual_values,
4709                )
4710                .unwrap();
4711
4712            assert_eq!(&actual_values[..values_read], self.expected_values);
4713            if let Some(ref v) = actual_def {
4714                assert_eq!(&v[..levels_read], self.expected_def_levels.unwrap());
4715            }
4716            if let Some(ref v) = actual_rep {
4717                assert_eq!(&v[..levels_read], self.expected_rep_levels.unwrap());
4718            }
4719        }
4720    }
4721
4722    #[test]
4723    fn test_uniform_def_levels_all_null() {
4724        // All-null column: def_level=0 (null) for every slot, no values written.
4725        let max_def_level = 1;
4726        let count = 100;
4727        let expected_def_levels = vec![0i16; count];
4728        ColumnRoundTripUniform::<Int32Type>::new()
4729            .with_def_levels(LevelDataRef::Uniform { value: 0, count })
4730            .with_max_def_level(max_def_level)
4731            .with_expected_def_levels(&expected_def_levels)
4732            .run();
4733    }
4734
4735    #[test]
4736    fn test_uniform_def_levels_all_valid() {
4737        // All-valid column: def_level=max for every slot, all values written.
4738        let max_def_level = 1;
4739        let values: Vec<i32> = (0..50).collect();
4740        let expected_def_levels = vec![max_def_level; values.len()];
4741        ColumnRoundTripUniform::<Int32Type>::new()
4742            .with_values(&values)
4743            .with_def_levels(LevelDataRef::Uniform {
4744                value: max_def_level,
4745                count: values.len(),
4746            })
4747            .with_max_def_level(max_def_level)
4748            .with_expected_values(&values)
4749            .with_expected_def_levels(&expected_def_levels)
4750            .run();
4751    }
4752
4753    #[test]
4754    fn test_uniform_def_and_rep_levels() {
4755        // Simulates a list column where every row is null:
4756        // def=0, rep=0 for each row (one row = one entry with no child values).
4757        let max_def_level = 2;
4758        let max_rep_level = 1;
4759        let count = 200;
4760        let expected_def_levels = vec![0i16; count];
4761        let expected_rep_levels = vec![0i16; count];
4762        ColumnRoundTripUniform::<Int32Type>::new()
4763            .with_def_levels(LevelDataRef::Uniform { value: 0, count })
4764            .with_rep_levels(LevelDataRef::Uniform { value: 0, count })
4765            .with_max_def_level(max_def_level)
4766            .with_max_rep_level(max_rep_level)
4767            .with_expected_def_levels(&expected_def_levels)
4768            .with_expected_rep_levels(&expected_rep_levels)
4769            .run();
4770    }
4771
4772    #[test]
4773    fn test_uniform_levels_v1_and_v2() {
4774        // Verify uniform levels work identically for both Parquet writer versions.
4775        for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
4776            let props = WriterProperties::builder()
4777                .set_writer_version(version)
4778                .build();
4779            let max_def = 1;
4780            let count = 100;
4781            let expected_def_levels = vec![0i16; count];
4782            ColumnRoundTripUniform::<Int32Type>::new()
4783                .with_props(props)
4784                .with_def_levels(LevelDataRef::Uniform { value: 0, count })
4785                .with_max_def_level(max_def)
4786                .with_expected_def_levels(&expected_def_levels)
4787                .run();
4788        }
4789    }
4790}