parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30    BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43    OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54    ($e:expr, $i:ident, $b:expr) => {
55        match $e {
56            Self::BoolColumnWriter($i) => $b,
57            Self::Int32ColumnWriter($i) => $b,
58            Self::Int64ColumnWriter($i) => $b,
59            Self::Int96ColumnWriter($i) => $b,
60            Self::FloatColumnWriter($i) => $b,
61            Self::DoubleColumnWriter($i) => $b,
62            Self::ByteArrayColumnWriter($i) => $b,
63            Self::FixedLenByteArrayColumnWriter($i) => $b,
64        }
65    };
66}
67
68/// Column writer for a Parquet type.
69///
70/// See [`get_column_writer`] to create instances of this type
71pub enum ColumnWriter<'a> {
72    /// Column writer for boolean type
73    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74    /// Column writer for int32 type
75    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76    /// Column writer for int64 type
77    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78    /// Column writer for int96 (timestamp) type
79    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80    /// Column writer for float type
81    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82    /// Column writer for double type
83    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84    /// Column writer for byte array type
85    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86    /// Column writer for fixed length byte array type
87    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91    /// Returns the estimated total memory usage
92    #[cfg(feature = "arrow")]
93    pub(crate) fn memory_size(&self) -> usize {
94        downcast_writer!(self, typed, typed.memory_size())
95    }
96
97    /// Returns the estimated total encoded bytes for this column writer
98    #[cfg(feature = "arrow")]
99    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101    }
102
103    /// Close this [`ColumnWriter`], returning the metadata for the column chunk.
104    pub fn close(self) -> Result<ColumnCloseResult> {
105        downcast_writer!(self, typed, typed.close())
106    }
107}
108
109/// Create a specific column writer corresponding to column descriptor `descr`.
110pub fn get_column_writer<'a>(
111    descr: ColumnDescPtr,
112    props: WriterPropertiesPtr,
113    page_writer: Box<dyn PageWriter + 'a>,
114) -> ColumnWriter<'a> {
115    match descr.physical_type() {
116        Type::BOOLEAN => {
117            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
118        }
119        Type::INT32 => {
120            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
121        }
122        Type::INT64 => {
123            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124        }
125        Type::INT96 => {
126            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::FLOAT => {
129            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::DOUBLE => {
132            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::BYTE_ARRAY => {
135            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
138            ColumnWriterImpl::new(descr, props, page_writer),
139        ),
140    }
141}
142
143/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
144/// non-generic type to a generic column writer type `ColumnWriterImpl`.
145///
146/// Panics if actual enum value for `col_writer` does not match the type `T`.
147pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
148    T::get_column_writer(col_writer).unwrap_or_else(|| {
149        panic!(
150            "Failed to convert column writer into a typed column writer for `{}` type",
151            T::get_physical_type()
152        )
153    })
154}
155
156/// Similar to `get_typed_column_writer` but returns a reference.
157pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
158    col_writer: &'b ColumnWriter<'a>,
159) -> &'b ColumnWriterImpl<'a, T> {
160    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
161        panic!(
162            "Failed to convert column writer into a typed column writer for `{}` type",
163            T::get_physical_type()
164        )
165    })
166}
167
168/// Similar to `get_typed_column_writer` but returns a reference.
169pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
170    col_writer: &'a mut ColumnWriter<'b>,
171) -> &'a mut ColumnWriterImpl<'b, T> {
172    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
173        panic!(
174            "Failed to convert column writer into a typed column writer for `{}` type",
175            T::get_physical_type()
176        )
177    })
178}
179
180/// Metadata for a column chunk of a Parquet file.
181///
182/// Note this structure is returned by [`ColumnWriter::close`].
183#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185    /// The total number of bytes written
186    pub bytes_written: u64,
187    /// The total number of rows written
188    pub rows_written: u64,
189    /// Metadata for this column chunk
190    pub metadata: ColumnChunkMetaData,
191    /// Optional bloom filter for this column
192    pub bloom_filter: Option<Sbbf>,
193    /// Optional column index, for filtering
194    pub column_index: Option<ColumnIndexMetaData>,
195    /// Optional offset index, identifying page locations
196    pub offset_index: Option<OffsetIndexMetaData>,
197}
198
199// Metrics per page
200#[derive(Default)]
201struct PageMetrics {
202    num_buffered_values: u32,
203    num_buffered_rows: u32,
204    num_page_nulls: u64,
205    repetition_level_histogram: Option<LevelHistogram>,
206    definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210    fn new() -> Self {
211        Default::default()
212    }
213
214    /// Initialize the repetition level histogram
215    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Initialize the definition level histogram
221    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222        self.definition_level_histogram = LevelHistogram::try_new(max_level);
223        self
224    }
225
226    /// Resets the state of this `PageMetrics` to the initial state.
227    /// If histograms have been initialized their contents will be reset to zero.
228    fn new_page(&mut self) {
229        self.num_buffered_values = 0;
230        self.num_buffered_rows = 0;
231        self.num_page_nulls = 0;
232        self.repetition_level_histogram
233            .as_mut()
234            .map(LevelHistogram::reset);
235        self.definition_level_histogram
236            .as_mut()
237            .map(LevelHistogram::reset);
238    }
239
240    /// Updates histogram values using provided repetition levels
241    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243            rep_hist.update_from_levels(levels);
244        }
245    }
246
247    /// Updates histogram values using provided definition levels
248    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249        if let Some(ref mut def_hist) = self.definition_level_histogram {
250            def_hist.update_from_levels(levels);
251        }
252    }
253}
254
255// Metrics per column writer
256#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258    total_bytes_written: u64,
259    total_rows_written: u64,
260    total_uncompressed_size: u64,
261    total_compressed_size: u64,
262    total_num_values: u64,
263    dictionary_page_offset: Option<u64>,
264    data_page_offset: Option<u64>,
265    min_column_value: Option<T>,
266    max_column_value: Option<T>,
267    num_column_nulls: u64,
268    column_distinct_count: Option<u64>,
269    variable_length_bytes: Option<i64>,
270    repetition_level_histogram: Option<LevelHistogram>,
271    definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275    fn new() -> Self {
276        Default::default()
277    }
278
279    /// Initialize the repetition level histogram
280    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Initialize the definition level histogram
286    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287        self.definition_level_histogram = LevelHistogram::try_new(max_level);
288        self
289    }
290
291    /// Sum `page_histogram` into `chunk_histogram`
292    fn update_histogram(
293        chunk_histogram: &mut Option<LevelHistogram>,
294        page_histogram: &Option<LevelHistogram>,
295    ) {
296        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297            chunk_hist.add(page_hist);
298        }
299    }
300
301    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
302    /// page histograms are not initialized.
303    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304        ColumnMetrics::<T>::update_histogram(
305            &mut self.definition_level_histogram,
306            &page_metrics.definition_level_histogram,
307        );
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.repetition_level_histogram,
310            &page_metrics.repetition_level_histogram,
311        );
312    }
313
314    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
315    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316        if let Some(var_bytes) = variable_length_bytes {
317            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318        }
319    }
320}
321
322/// Typed column writer for a primitive column.
323pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325/// Generic column writer for a primitive Parquet column
326pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327    // Column writer properties
328    descr: ColumnDescPtr,
329    props: WriterPropertiesPtr,
330    statistics_enabled: EnabledStatistics,
331
332    page_writer: Box<dyn PageWriter + 'a>,
333    codec: Compression,
334    compressor: Option<Box<dyn Codec>>,
335    encoder: E,
336
337    page_metrics: PageMetrics,
338    // Metrics per column writer
339    column_metrics: ColumnMetrics<E::T>,
340
341    /// The order of encodings within the generated metadata does not impact its meaning,
342    /// but we use a BTreeSet so that the output is deterministic
343    encodings: BTreeSet<Encoding>,
344    encoding_stats: Vec<PageEncodingStats>,
345    // Reused buffers
346    def_levels_sink: Vec<i16>,
347    rep_levels_sink: Vec<i16>,
348    data_pages: VecDeque<CompressedPage>,
349    // column index and offset index
350    column_index_builder: ColumnIndexBuilder,
351    offset_index_builder: Option<OffsetIndexBuilder>,
352
353    // Below fields used to incrementally check boundary order across data pages.
354    // We assume they are ascending/descending until proven wrong.
355    data_page_boundary_ascending: bool,
356    data_page_boundary_descending: bool,
357    /// (min, max)
358    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
359}
360
361impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
362    /// Returns a new instance of [`GenericColumnWriter`].
363    pub fn new(
364        descr: ColumnDescPtr,
365        props: WriterPropertiesPtr,
366        page_writer: Box<dyn PageWriter + 'a>,
367    ) -> Self {
368        let codec = props.compression(descr.path());
369        let codec_options = CodecOptionsBuilder::default().build();
370        let compressor = create_codec(codec, &codec_options).unwrap();
371        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
372
373        let statistics_enabled = props.statistics_enabled(descr.path());
374
375        let mut encodings = BTreeSet::new();
376        // Used for level information
377        encodings.insert(Encoding::RLE);
378
379        let mut page_metrics = PageMetrics::new();
380        let mut column_metrics = ColumnMetrics::<E::T>::new();
381
382        // Initialize level histograms if collecting page or chunk statistics
383        if statistics_enabled != EnabledStatistics::None {
384            page_metrics = page_metrics
385                .with_repetition_level_histogram(descr.max_rep_level())
386                .with_definition_level_histogram(descr.max_def_level());
387            column_metrics = column_metrics
388                .with_repetition_level_histogram(descr.max_rep_level())
389                .with_definition_level_histogram(descr.max_def_level())
390        }
391
392        // Disable column_index_builder if not collecting page statistics.
393        let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
394        if statistics_enabled != EnabledStatistics::Page {
395            column_index_builder.to_invalid()
396        }
397
398        // Disable offset_index_builder if requested by user.
399        let offset_index_builder = match props.offset_index_disabled() {
400            false => Some(OffsetIndexBuilder::new()),
401            _ => None,
402        };
403
404        Self {
405            descr,
406            props,
407            statistics_enabled,
408            page_writer,
409            codec,
410            compressor,
411            encoder,
412            def_levels_sink: vec![],
413            rep_levels_sink: vec![],
414            data_pages: VecDeque::new(),
415            page_metrics,
416            column_metrics,
417            column_index_builder,
418            offset_index_builder,
419            encodings,
420            encoding_stats: vec![],
421            data_page_boundary_ascending: true,
422            data_page_boundary_descending: true,
423            last_non_null_data_page_min_max: None,
424        }
425    }
426
427    #[allow(clippy::too_many_arguments)]
428    pub(crate) fn write_batch_internal(
429        &mut self,
430        values: &E::Values,
431        value_indices: Option<&[usize]>,
432        def_levels: Option<&[i16]>,
433        rep_levels: Option<&[i16]>,
434        min: Option<&E::T>,
435        max: Option<&E::T>,
436        distinct_count: Option<u64>,
437    ) -> Result<usize> {
438        // Check if number of definition levels is the same as number of repetition levels.
439        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
440            if def.len() != rep.len() {
441                return Err(general_err!(
442                    "Inconsistent length of definition and repetition levels: {} != {}",
443                    def.len(),
444                    rep.len()
445                ));
446            }
447        }
448
449        // We check for DataPage limits only after we have inserted the values. If a user
450        // writes a large number of values, the DataPage size can be well above the limit.
451        //
452        // The purpose of this chunking is to bound this. Even if a user writes large
453        // number of values, the chunking will ensure that we add data page at a
454        // reasonable pagesize limit.
455
456        // TODO: find out why we don't account for size of levels when we estimate page
457        // size.
458
459        let num_levels = match def_levels {
460            Some(def_levels) => def_levels.len(),
461            None => values.len(),
462        };
463
464        if let Some(min) = min {
465            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
466        }
467        if let Some(max) = max {
468            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
469        }
470
471        // We can only set the distinct count if there are no other writes
472        if self.encoder.num_values() == 0 {
473            self.column_metrics.column_distinct_count = distinct_count;
474        } else {
475            self.column_metrics.column_distinct_count = None;
476        }
477
478        let mut values_offset = 0;
479        let mut levels_offset = 0;
480        let base_batch_size = self.props.write_batch_size();
481        while levels_offset < num_levels {
482            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
483
484            // Split at record boundary
485            if let Some(r) = rep_levels {
486                while end_offset < r.len() && r[end_offset] != 0 {
487                    end_offset += 1;
488                }
489            }
490
491            values_offset += self.write_mini_batch(
492                values,
493                values_offset,
494                value_indices,
495                end_offset - levels_offset,
496                def_levels.map(|lv| &lv[levels_offset..end_offset]),
497                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
498            )?;
499            levels_offset = end_offset;
500        }
501
502        // Return total number of values processed.
503        Ok(values_offset)
504    }
505
506    /// Writes batch of values, definition levels and repetition levels.
507    /// Returns number of values processed (written).
508    ///
509    /// If definition and repetition levels are provided, we write fully those levels and
510    /// select how many values to write (this number will be returned), since number of
511    /// actual written values may be smaller than provided values.
512    ///
513    /// If only values are provided, then all values are written and the length of
514    /// of the values buffer is returned.
515    ///
516    /// Definition and/or repetition levels can be omitted, if values are
517    /// non-nullable and/or non-repeated.
518    pub fn write_batch(
519        &mut self,
520        values: &E::Values,
521        def_levels: Option<&[i16]>,
522        rep_levels: Option<&[i16]>,
523    ) -> Result<usize> {
524        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
525    }
526
527    /// Writer may optionally provide pre-calculated statistics for use when computing
528    /// chunk-level statistics
529    ///
530    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
531    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
532    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
533    /// computed page statistics
534    pub fn write_batch_with_statistics(
535        &mut self,
536        values: &E::Values,
537        def_levels: Option<&[i16]>,
538        rep_levels: Option<&[i16]>,
539        min: Option<&E::T>,
540        max: Option<&E::T>,
541        distinct_count: Option<u64>,
542    ) -> Result<usize> {
543        self.write_batch_internal(
544            values,
545            None,
546            def_levels,
547            rep_levels,
548            min,
549            max,
550            distinct_count,
551        )
552    }
553
554    /// Returns the estimated total memory usage.
555    ///
556    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
557    /// of the current memory usage and not the final anticipated encoded size.
558    #[cfg(feature = "arrow")]
559    pub(crate) fn memory_size(&self) -> usize {
560        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
561    }
562
563    /// Returns total number of bytes written by this column writer so far.
564    /// This value is also returned when column writer is closed.
565    ///
566    /// Note: this value does not include any buffered data that has not
567    /// yet been flushed to a page.
568    pub fn get_total_bytes_written(&self) -> u64 {
569        self.column_metrics.total_bytes_written
570    }
571
572    /// Returns the estimated total encoded bytes for this column writer.
573    ///
574    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
575    /// of any data that has not yet been flushed to a page, based on it's
576    /// anticipated encoded size.
577    #[cfg(feature = "arrow")]
578    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
579        self.data_pages
580            .iter()
581            .map(|page| page.data().len() as u64)
582            .sum::<u64>()
583            + self.column_metrics.total_bytes_written
584            + self.encoder.estimated_data_page_size() as u64
585            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
586    }
587
588    /// Returns total number of rows written by this column writer so far.
589    /// This value is also returned when column writer is closed.
590    pub fn get_total_rows_written(&self) -> u64 {
591        self.column_metrics.total_rows_written
592    }
593
594    /// Returns a reference to a [`ColumnDescPtr`]
595    pub fn get_descriptor(&self) -> &ColumnDescPtr {
596        &self.descr
597    }
598
599    /// Finalizes writes and closes the column writer.
600    /// Returns total bytes written, total rows written and column chunk metadata.
601    pub fn close(mut self) -> Result<ColumnCloseResult> {
602        if self.page_metrics.num_buffered_values > 0 {
603            self.add_data_page()?;
604        }
605        if self.encoder.has_dictionary() {
606            self.write_dictionary_page()?;
607        }
608        self.flush_data_pages()?;
609        let metadata = self.build_column_metadata()?;
610        self.page_writer.close()?;
611
612        let boundary_order = match (
613            self.data_page_boundary_ascending,
614            self.data_page_boundary_descending,
615        ) {
616            // If the lists are composed of equal elements then will be marked as ascending
617            // (Also the case if all pages are null pages)
618            (true, _) => BoundaryOrder::ASCENDING,
619            (false, true) => BoundaryOrder::DESCENDING,
620            (false, false) => BoundaryOrder::UNORDERED,
621        };
622        self.column_index_builder.set_boundary_order(boundary_order);
623
624        let column_index = match self.column_index_builder.valid() {
625            true => Some(self.column_index_builder.build()?),
626            false => None,
627        };
628
629        let offset_index = self.offset_index_builder.map(|b| b.build());
630
631        Ok(ColumnCloseResult {
632            bytes_written: self.column_metrics.total_bytes_written,
633            rows_written: self.column_metrics.total_rows_written,
634            bloom_filter: self.encoder.flush_bloom_filter(),
635            metadata,
636            column_index,
637            offset_index,
638        })
639    }
640
641    /// Writes mini batch of values, definition and repetition levels.
642    /// This allows fine-grained processing of values and maintaining a reasonable
643    /// page size.
644    fn write_mini_batch(
645        &mut self,
646        values: &E::Values,
647        values_offset: usize,
648        value_indices: Option<&[usize]>,
649        num_levels: usize,
650        def_levels: Option<&[i16]>,
651        rep_levels: Option<&[i16]>,
652    ) -> Result<usize> {
653        // Process definition levels and determine how many values to write.
654        let values_to_write = if self.descr.max_def_level() > 0 {
655            let levels = def_levels.ok_or_else(|| {
656                general_err!(
657                    "Definition levels are required, because max definition level = {}",
658                    self.descr.max_def_level()
659                )
660            })?;
661
662            let values_to_write = levels
663                .iter()
664                .map(|level| (*level == self.descr.max_def_level()) as usize)
665                .sum();
666            self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
667
668            // Update histogram
669            self.page_metrics.update_definition_level_histogram(levels);
670
671            self.def_levels_sink.extend_from_slice(levels);
672            values_to_write
673        } else {
674            num_levels
675        };
676
677        // Process repetition levels and determine how many rows we are about to process.
678        if self.descr.max_rep_level() > 0 {
679            // A row could contain more than one value.
680            let levels = rep_levels.ok_or_else(|| {
681                general_err!(
682                    "Repetition levels are required, because max repetition level = {}",
683                    self.descr.max_rep_level()
684                )
685            })?;
686
687            if !levels.is_empty() && levels[0] != 0 {
688                return Err(general_err!(
689                    "Write must start at a record boundary, got non-zero repetition level of {}",
690                    levels[0]
691                ));
692            }
693
694            // Count the occasions where we start a new row
695            for &level in levels {
696                self.page_metrics.num_buffered_rows += (level == 0) as u32
697            }
698
699            // Update histogram
700            self.page_metrics.update_repetition_level_histogram(levels);
701
702            self.rep_levels_sink.extend_from_slice(levels);
703        } else {
704            // Each value is exactly one row.
705            // Equals to the number of values, we count nulls as well.
706            self.page_metrics.num_buffered_rows += num_levels as u32;
707        }
708
709        match value_indices {
710            Some(indices) => {
711                let indices = &indices[values_offset..values_offset + values_to_write];
712                self.encoder.write_gather(values, indices)?;
713            }
714            None => self.encoder.write(values, values_offset, values_to_write)?,
715        }
716
717        self.page_metrics.num_buffered_values += num_levels as u32;
718
719        if self.should_add_data_page() {
720            self.add_data_page()?;
721        }
722
723        if self.should_dict_fallback() {
724            self.dict_fallback()?;
725        }
726
727        Ok(values_to_write)
728    }
729
730    /// Returns true if we need to fall back to non-dictionary encoding.
731    ///
732    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
733    /// size.
734    #[inline]
735    fn should_dict_fallback(&self) -> bool {
736        match self.encoder.estimated_dict_page_size() {
737            Some(size) => {
738                size >= self
739                    .props
740                    .column_dictionary_page_size_limit(self.descr.path())
741            }
742            None => false,
743        }
744    }
745
746    /// Returns true if there is enough data for a data page, false otherwise.
747    #[inline]
748    fn should_add_data_page(&self) -> bool {
749        // This is necessary in the event of a much larger dictionary size than page size
750        //
751        // In such a scenario the dictionary decoder may return an estimated encoded
752        // size in excess of the page size limit, even when there are no buffered values
753        if self.page_metrics.num_buffered_values == 0 {
754            return false;
755        }
756
757        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
758            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
759    }
760
761    /// Performs dictionary fallback.
762    /// Prepares and writes dictionary and all data pages into page writer.
763    fn dict_fallback(&mut self) -> Result<()> {
764        // At this point we know that we need to fall back.
765        if self.page_metrics.num_buffered_values > 0 {
766            self.add_data_page()?;
767        }
768        self.write_dictionary_page()?;
769        self.flush_data_pages()?;
770        Ok(())
771    }
772
773    /// Update the column index and offset index when adding the data page
774    fn update_column_offset_index(
775        &mut self,
776        page_statistics: Option<&ValueStatistics<E::T>>,
777        page_variable_length_bytes: Option<i64>,
778    ) {
779        // update the column index
780        let null_page =
781            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
782        // a page contains only null values,
783        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
784        if null_page && self.column_index_builder.valid() {
785            self.column_index_builder.append(
786                null_page,
787                vec![],
788                vec![],
789                self.page_metrics.num_page_nulls as i64,
790            );
791        } else if self.column_index_builder.valid() {
792            // from page statistics
793            // If can't get the page statistics, ignore this column/offset index for this column chunk
794            match &page_statistics {
795                None => {
796                    self.column_index_builder.to_invalid();
797                }
798                Some(stat) => {
799                    // Check if min/max are still ascending/descending across pages
800                    let new_min = stat.min_opt().unwrap();
801                    let new_max = stat.max_opt().unwrap();
802                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
803                        if self.data_page_boundary_ascending {
804                            // If last min/max are greater than new min/max then not ascending anymore
805                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
806                                || compare_greater(&self.descr, last_max, new_max);
807                            if not_ascending {
808                                self.data_page_boundary_ascending = false;
809                            }
810                        }
811
812                        if self.data_page_boundary_descending {
813                            // If new min/max are greater than last min/max then not descending anymore
814                            let not_descending = compare_greater(&self.descr, new_min, last_min)
815                                || compare_greater(&self.descr, new_max, last_max);
816                            if not_descending {
817                                self.data_page_boundary_descending = false;
818                            }
819                        }
820                    }
821                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
822
823                    if self.can_truncate_value() {
824                        self.column_index_builder.append(
825                            null_page,
826                            self.truncate_min_value(
827                                self.props.column_index_truncate_length(),
828                                stat.min_bytes_opt().unwrap(),
829                            )
830                            .0,
831                            self.truncate_max_value(
832                                self.props.column_index_truncate_length(),
833                                stat.max_bytes_opt().unwrap(),
834                            )
835                            .0,
836                            self.page_metrics.num_page_nulls as i64,
837                        );
838                    } else {
839                        self.column_index_builder.append(
840                            null_page,
841                            stat.min_bytes_opt().unwrap().to_vec(),
842                            stat.max_bytes_opt().unwrap().to_vec(),
843                            self.page_metrics.num_page_nulls as i64,
844                        );
845                    }
846                }
847            }
848        }
849
850        // Append page histograms to the `ColumnIndex` histograms
851        self.column_index_builder.append_histograms(
852            &self.page_metrics.repetition_level_histogram,
853            &self.page_metrics.definition_level_histogram,
854        );
855
856        // Update the offset index
857        if let Some(builder) = self.offset_index_builder.as_mut() {
858            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
859            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
860        }
861    }
862
863    /// Determine if we should allow truncating min/max values for this column's statistics
864    fn can_truncate_value(&self) -> bool {
865        match self.descr.physical_type() {
866            // Don't truncate for Float16 and Decimal because their sort order is different
867            // from that of FIXED_LEN_BYTE_ARRAY sort order.
868            // So truncation of those types could lead to inaccurate min/max statistics
869            Type::FIXED_LEN_BYTE_ARRAY
870                if !matches!(
871                    self.descr.logical_type(),
872                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
873                ) =>
874            {
875                true
876            }
877            Type::BYTE_ARRAY => true,
878            // Truncation only applies for fba/binary physical types
879            _ => false,
880        }
881    }
882
883    /// Returns `true` if this column's logical type is a UTF-8 string.
884    fn is_utf8(&self) -> bool {
885        self.get_descriptor().logical_type() == Some(LogicalType::String)
886            || self.get_descriptor().converted_type() == ConvertedType::UTF8
887    }
888
889    /// Truncates a binary statistic to at most `truncation_length` bytes.
890    ///
891    /// If truncation is not possible, returns `data`.
892    ///
893    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
894    ///
895    /// UTF-8 Note:
896    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
897    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
898    /// on non-character boundaries.
899    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
900        truncation_length
901            .filter(|l| data.len() > *l)
902            .and_then(|l|
903                // don't do extra work if this column isn't UTF-8
904                if self.is_utf8() {
905                    match str::from_utf8(data) {
906                        Ok(str_data) => truncate_utf8(str_data, l),
907                        Err(_) => Some(data[..l].to_vec()),
908                    }
909                } else {
910                    Some(data[..l].to_vec())
911                }
912            )
913            .map(|truncated| (truncated, true))
914            .unwrap_or_else(|| (data.to_vec(), false))
915    }
916
917    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
918    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
919    /// `truncation_length` bytes if the last byte(s) overflows.
920    ///
921    /// If truncation is not possible, returns `data`.
922    ///
923    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
924    ///
925    /// UTF-8 Note:
926    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
927    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
928    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
929    /// binary.
930    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
931        truncation_length
932            .filter(|l| data.len() > *l)
933            .and_then(|l|
934                // don't do extra work if this column isn't UTF-8
935                if self.is_utf8() {
936                    match str::from_utf8(data) {
937                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
938                        Err(_) => increment(data[..l].to_vec()),
939                    }
940                } else {
941                    increment(data[..l].to_vec())
942                }
943            )
944            .map(|truncated| (truncated, true))
945            .unwrap_or_else(|| (data.to_vec(), false))
946    }
947
948    /// Truncate the min and max values that will be written to a data page
949    /// header or column chunk Statistics
950    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
951        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
952        match statistics {
953            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
954                let (min, did_truncate_min) = self.truncate_min_value(
955                    self.props.statistics_truncate_length(),
956                    stats.min_bytes_opt().unwrap(),
957                );
958                let (max, did_truncate_max) = self.truncate_max_value(
959                    self.props.statistics_truncate_length(),
960                    stats.max_bytes_opt().unwrap(),
961                );
962                Statistics::ByteArray(
963                    ValueStatistics::new(
964                        Some(min.into()),
965                        Some(max.into()),
966                        stats.distinct_count(),
967                        stats.null_count_opt(),
968                        backwards_compatible_min_max,
969                    )
970                    .with_max_is_exact(!did_truncate_max)
971                    .with_min_is_exact(!did_truncate_min),
972                )
973            }
974            Statistics::FixedLenByteArray(stats)
975                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
976            {
977                let (min, did_truncate_min) = self.truncate_min_value(
978                    self.props.statistics_truncate_length(),
979                    stats.min_bytes_opt().unwrap(),
980                );
981                let (max, did_truncate_max) = self.truncate_max_value(
982                    self.props.statistics_truncate_length(),
983                    stats.max_bytes_opt().unwrap(),
984                );
985                Statistics::FixedLenByteArray(
986                    ValueStatistics::new(
987                        Some(min.into()),
988                        Some(max.into()),
989                        stats.distinct_count(),
990                        stats.null_count_opt(),
991                        backwards_compatible_min_max,
992                    )
993                    .with_max_is_exact(!did_truncate_max)
994                    .with_min_is_exact(!did_truncate_min),
995                )
996            }
997            stats => stats,
998        }
999    }
1000
1001    /// Adds data page.
1002    /// Data page is either buffered in case of dictionary encoding or written directly.
1003    fn add_data_page(&mut self) -> Result<()> {
1004        // Extract encoded values
1005        let values_data = self.encoder.flush_data_page()?;
1006
1007        let max_def_level = self.descr.max_def_level();
1008        let max_rep_level = self.descr.max_rep_level();
1009
1010        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1011
1012        let page_statistics = match (values_data.min_value, values_data.max_value) {
1013            (Some(min), Some(max)) => {
1014                // Update chunk level statistics
1015                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1016                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1017
1018                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1019                    ValueStatistics::new(
1020                        Some(min),
1021                        Some(max),
1022                        None,
1023                        Some(self.page_metrics.num_page_nulls),
1024                        false,
1025                    ),
1026                )
1027            }
1028            _ => None,
1029        };
1030
1031        // update column and offset index
1032        self.update_column_offset_index(
1033            page_statistics.as_ref(),
1034            values_data.variable_length_bytes,
1035        );
1036
1037        // Update histograms and variable_length_bytes in column_metrics
1038        self.column_metrics
1039            .update_from_page_metrics(&self.page_metrics);
1040        self.column_metrics
1041            .update_variable_length_bytes(values_data.variable_length_bytes);
1042
1043        // From here on, we only need page statistics if they will be written to the page header.
1044        let page_statistics = page_statistics
1045            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1046            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1047
1048        let compressed_page = match self.props.writer_version() {
1049            WriterVersion::PARQUET_1_0 => {
1050                let mut buffer = vec![];
1051
1052                if max_rep_level > 0 {
1053                    buffer.extend_from_slice(
1054                        &self.encode_levels_v1(
1055                            Encoding::RLE,
1056                            &self.rep_levels_sink[..],
1057                            max_rep_level,
1058                        )[..],
1059                    );
1060                }
1061
1062                if max_def_level > 0 {
1063                    buffer.extend_from_slice(
1064                        &self.encode_levels_v1(
1065                            Encoding::RLE,
1066                            &self.def_levels_sink[..],
1067                            max_def_level,
1068                        )[..],
1069                    );
1070                }
1071
1072                buffer.extend_from_slice(&values_data.buf);
1073                let uncompressed_size = buffer.len();
1074
1075                if let Some(ref mut cmpr) = self.compressor {
1076                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1077                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1078                    compressed_buf.shrink_to_fit();
1079                    buffer = compressed_buf;
1080                }
1081
1082                let data_page = Page::DataPage {
1083                    buf: buffer.into(),
1084                    num_values: self.page_metrics.num_buffered_values,
1085                    encoding: values_data.encoding,
1086                    def_level_encoding: Encoding::RLE,
1087                    rep_level_encoding: Encoding::RLE,
1088                    statistics: page_statistics,
1089                };
1090
1091                CompressedPage::new(data_page, uncompressed_size)
1092            }
1093            WriterVersion::PARQUET_2_0 => {
1094                let mut rep_levels_byte_len = 0;
1095                let mut def_levels_byte_len = 0;
1096                let mut buffer = vec![];
1097
1098                if max_rep_level > 0 {
1099                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1100                    rep_levels_byte_len = levels.len();
1101                    buffer.extend_from_slice(&levels[..]);
1102                }
1103
1104                if max_def_level > 0 {
1105                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1106                    def_levels_byte_len = levels.len();
1107                    buffer.extend_from_slice(&levels[..]);
1108                }
1109
1110                let uncompressed_size =
1111                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1112
1113                // Data Page v2 compresses values only.
1114                let is_compressed = match self.compressor {
1115                    Some(ref mut cmpr) => {
1116                        let buffer_len = buffer.len();
1117                        cmpr.compress(&values_data.buf, &mut buffer)?;
1118                        if uncompressed_size <= buffer.len() - buffer_len {
1119                            buffer.truncate(buffer_len);
1120                            buffer.extend_from_slice(&values_data.buf);
1121                            false
1122                        } else {
1123                            true
1124                        }
1125                    }
1126                    None => {
1127                        buffer.extend_from_slice(&values_data.buf);
1128                        false
1129                    }
1130                };
1131
1132                let data_page = Page::DataPageV2 {
1133                    buf: buffer.into(),
1134                    num_values: self.page_metrics.num_buffered_values,
1135                    encoding: values_data.encoding,
1136                    num_nulls: self.page_metrics.num_page_nulls as u32,
1137                    num_rows: self.page_metrics.num_buffered_rows,
1138                    def_levels_byte_len: def_levels_byte_len as u32,
1139                    rep_levels_byte_len: rep_levels_byte_len as u32,
1140                    is_compressed,
1141                    statistics: page_statistics,
1142                };
1143
1144                CompressedPage::new(data_page, uncompressed_size)
1145            }
1146        };
1147
1148        // Check if we need to buffer data page or flush it to the sink directly.
1149        if self.encoder.has_dictionary() {
1150            self.data_pages.push_back(compressed_page);
1151        } else {
1152            self.write_data_page(compressed_page)?;
1153        }
1154
1155        // Update total number of rows.
1156        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1157
1158        // Reset state.
1159        self.rep_levels_sink.clear();
1160        self.def_levels_sink.clear();
1161        self.page_metrics.new_page();
1162
1163        Ok(())
1164    }
1165
1166    /// Finalises any outstanding data pages and flushes buffered data pages from
1167    /// dictionary encoding into underlying sink.
1168    #[inline]
1169    fn flush_data_pages(&mut self) -> Result<()> {
1170        // Write all outstanding data to a new page.
1171        if self.page_metrics.num_buffered_values > 0 {
1172            self.add_data_page()?;
1173        }
1174
1175        while let Some(page) = self.data_pages.pop_front() {
1176            self.write_data_page(page)?;
1177        }
1178
1179        Ok(())
1180    }
1181
1182    /// Assembles column chunk metadata.
1183    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1184        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1185        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1186        let num_values = self.column_metrics.total_num_values as i64;
1187        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1188        // If data page offset is not set, then no pages have been written
1189        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1190
1191        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1192            .set_compression(self.codec)
1193            .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1194            .set_page_encoding_stats(self.encoding_stats.clone())
1195            .set_total_compressed_size(total_compressed_size)
1196            .set_total_uncompressed_size(total_uncompressed_size)
1197            .set_num_values(num_values)
1198            .set_data_page_offset(data_page_offset)
1199            .set_dictionary_page_offset(dict_page_offset);
1200
1201        if self.statistics_enabled != EnabledStatistics::None {
1202            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1203
1204            let statistics = ValueStatistics::<E::T>::new(
1205                self.column_metrics.min_column_value.clone(),
1206                self.column_metrics.max_column_value.clone(),
1207                self.column_metrics.column_distinct_count,
1208                Some(self.column_metrics.num_column_nulls),
1209                false,
1210            )
1211            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1212            .into();
1213
1214            let statistics = self.truncate_statistics(statistics);
1215
1216            builder = builder
1217                .set_statistics(statistics)
1218                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1219                .set_repetition_level_histogram(
1220                    self.column_metrics.repetition_level_histogram.take(),
1221                )
1222                .set_definition_level_histogram(
1223                    self.column_metrics.definition_level_histogram.take(),
1224                );
1225
1226            if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1227                builder = builder.set_geo_statistics(geo_stats);
1228            }
1229        }
1230
1231        builder = self.set_column_chunk_encryption_properties(builder);
1232
1233        let metadata = builder.build()?;
1234        Ok(metadata)
1235    }
1236
1237    /// Encodes definition or repetition levels for Data Page v1.
1238    #[inline]
1239    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1240        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1241        encoder.put(levels);
1242        encoder.consume()
1243    }
1244
1245    /// Encodes definition or repetition levels for Data Page v2.
1246    /// Encoding is always RLE.
1247    #[inline]
1248    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1249        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1250        encoder.put(levels);
1251        encoder.consume()
1252    }
1253
1254    /// Writes compressed data page into underlying sink and updates global metrics.
1255    #[inline]
1256    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1257        self.encodings.insert(page.encoding());
1258        match self.encoding_stats.last_mut() {
1259            Some(encoding_stats)
1260                if encoding_stats.page_type == page.page_type()
1261                    && encoding_stats.encoding == page.encoding() =>
1262            {
1263                encoding_stats.count += 1;
1264            }
1265            _ => {
1266                // data page type does not change inside a file
1267                // encoding can currently only change from dictionary to non-dictionary once
1268                self.encoding_stats.push(PageEncodingStats {
1269                    page_type: page.page_type(),
1270                    encoding: page.encoding(),
1271                    count: 1,
1272                });
1273            }
1274        }
1275        let page_spec = self.page_writer.write_page(page)?;
1276        // update offset index
1277        // compressed_size = header_size + compressed_data_size
1278        if let Some(builder) = self.offset_index_builder.as_mut() {
1279            builder
1280                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1281        }
1282        self.update_metrics_for_page(page_spec);
1283        Ok(())
1284    }
1285
1286    /// Writes dictionary page into underlying sink.
1287    #[inline]
1288    fn write_dictionary_page(&mut self) -> Result<()> {
1289        let compressed_page = {
1290            let mut page = self
1291                .encoder
1292                .flush_dict_page()?
1293                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1294
1295            let uncompressed_size = page.buf.len();
1296
1297            if let Some(ref mut cmpr) = self.compressor {
1298                let mut output_buf = Vec::with_capacity(uncompressed_size);
1299                cmpr.compress(&page.buf, &mut output_buf)?;
1300                page.buf = Bytes::from(output_buf);
1301            }
1302
1303            let dict_page = Page::DictionaryPage {
1304                buf: page.buf,
1305                num_values: page.num_values as u32,
1306                encoding: self.props.dictionary_page_encoding(),
1307                is_sorted: page.is_sorted,
1308            };
1309            CompressedPage::new(dict_page, uncompressed_size)
1310        };
1311
1312        self.encodings.insert(compressed_page.encoding());
1313        self.encoding_stats.push(PageEncodingStats {
1314            page_type: PageType::DICTIONARY_PAGE,
1315            encoding: compressed_page.encoding(),
1316            count: 1,
1317        });
1318        let page_spec = self.page_writer.write_page(compressed_page)?;
1319        self.update_metrics_for_page(page_spec);
1320        // For the directory page, don't need to update column/offset index.
1321        Ok(())
1322    }
1323
1324    /// Updates column writer metrics with each page metadata.
1325    #[inline]
1326    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1327        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1328        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1329        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1330
1331        match page_spec.page_type {
1332            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1333                self.column_metrics.total_num_values += page_spec.num_values as u64;
1334                if self.column_metrics.data_page_offset.is_none() {
1335                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1336                }
1337            }
1338            PageType::DICTIONARY_PAGE => {
1339                assert!(
1340                    self.column_metrics.dictionary_page_offset.is_none(),
1341                    "Dictionary offset is already set"
1342                );
1343                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1344            }
1345            _ => {}
1346        }
1347    }
1348
1349    #[inline]
1350    #[cfg(feature = "encryption")]
1351    fn set_column_chunk_encryption_properties(
1352        &self,
1353        builder: ColumnChunkMetaDataBuilder,
1354    ) -> ColumnChunkMetaDataBuilder {
1355        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1356            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1357                encryption_properties,
1358                &self.descr,
1359            ))
1360        } else {
1361            builder
1362        }
1363    }
1364
1365    #[inline]
1366    #[cfg(not(feature = "encryption"))]
1367    fn set_column_chunk_encryption_properties(
1368        &self,
1369        builder: ColumnChunkMetaDataBuilder,
1370    ) -> ColumnChunkMetaDataBuilder {
1371        builder
1372    }
1373}
1374
1375fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1376    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1377}
1378
1379fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1380    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1381}
1382
1383#[inline]
1384#[allow(clippy::eq_op)]
1385fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1386    match T::PHYSICAL_TYPE {
1387        Type::FLOAT | Type::DOUBLE => val != val,
1388        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1389            let val = val.as_bytes();
1390            let val = f16::from_le_bytes([val[0], val[1]]);
1391            val.is_nan()
1392        }
1393        _ => false,
1394    }
1395}
1396
1397/// Perform a conditional update of `cur`, skipping any NaN values
1398///
1399/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1400/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1401fn update_stat<T: ParquetValueType, F>(
1402    descr: &ColumnDescriptor,
1403    val: &T,
1404    cur: &mut Option<T>,
1405    should_update: F,
1406) where
1407    F: Fn(&T) -> bool,
1408{
1409    if is_nan(descr, val) {
1410        return;
1411    }
1412
1413    if cur.as_ref().is_none_or(should_update) {
1414        *cur = Some(val.clone());
1415    }
1416}
1417
1418/// Evaluate `a > b` according to underlying logical type.
1419fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1420    match T::PHYSICAL_TYPE {
1421        Type::INT32 | Type::INT64 => {
1422            if let Some(LogicalType::Integer {
1423                is_signed: false, ..
1424            }) = descr.logical_type()
1425            {
1426                // need to compare unsigned
1427                return compare_greater_unsigned_int(a, b);
1428            }
1429
1430            match descr.converted_type() {
1431                ConvertedType::UINT_8
1432                | ConvertedType::UINT_16
1433                | ConvertedType::UINT_32
1434                | ConvertedType::UINT_64 => {
1435                    return compare_greater_unsigned_int(a, b);
1436                }
1437                _ => {}
1438            };
1439        }
1440        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1441            if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1442                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1443            }
1444            if let ConvertedType::DECIMAL = descr.converted_type() {
1445                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1446            }
1447            if let Some(LogicalType::Float16) = descr.logical_type() {
1448                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1449            }
1450        }
1451
1452        _ => {}
1453    }
1454
1455    // compare independent of logical / converted type
1456    a > b
1457}
1458
1459// ----------------------------------------------------------------------
1460// Encoding support for column writer.
1461// This mirrors parquet-mr default encodings for writes. See:
1462// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1463// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1464
1465/// Returns encoding for a column when no other encoding is provided in writer properties.
1466fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1467    match (kind, props.writer_version()) {
1468        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1469        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1470        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1471        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1472        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1473        _ => Encoding::PLAIN,
1474    }
1475}
1476
1477/// Returns true if dictionary is supported for column writer, false otherwise.
1478fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1479    match (kind, props.writer_version()) {
1480        // Booleans do not support dict encoding and should use a fallback encoding.
1481        (Type::BOOLEAN, _) => false,
1482        // Dictionary encoding was not enabled in PARQUET 1.0
1483        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1484        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1485        _ => true,
1486    }
1487}
1488
1489#[inline]
1490fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1491    a.as_u64().unwrap() > b.as_u64().unwrap()
1492}
1493
1494#[inline]
1495fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1496    let a = f16::from_le_bytes(a.try_into().unwrap());
1497    let b = f16::from_le_bytes(b.try_into().unwrap());
1498    a > b
1499}
1500
1501/// Signed comparison of bytes arrays
1502fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1503    let a_length = a.len();
1504    let b_length = b.len();
1505
1506    if a_length == 0 || b_length == 0 {
1507        return a_length > 0;
1508    }
1509
1510    let first_a: u8 = a[0];
1511    let first_b: u8 = b[0];
1512
1513    // We can short circuit for different signed numbers or
1514    // for equal length bytes arrays that have different first bytes.
1515    // The equality requirement is necessary for sign extension cases.
1516    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1517    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1518        return (first_a as i8) > (first_b as i8);
1519    }
1520
1521    // When the lengths are unequal and the numbers are of the same
1522    // sign we need to do comparison by sign extending the shorter
1523    // value first, and once we get to equal sized arrays, lexicographical
1524    // unsigned comparison of everything but the first byte is sufficient.
1525
1526    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1527
1528    if a_length != b_length {
1529        let not_equal = if a_length > b_length {
1530            let lead_length = a_length - b_length;
1531            a[0..lead_length].iter().any(|&x| x != extension)
1532        } else {
1533            let lead_length = b_length - a_length;
1534            b[0..lead_length].iter().any(|&x| x != extension)
1535        };
1536
1537        if not_equal {
1538            let negative_values: bool = (first_a as i8) < 0;
1539            let a_longer: bool = a_length > b_length;
1540            return if negative_values { !a_longer } else { a_longer };
1541        }
1542    }
1543
1544    (a[1..]) > (b[1..])
1545}
1546
1547/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1548/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1549/// is not possible within those constraints.
1550///
1551/// The caller guarantees that data.len() > length.
1552fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1553    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1554    Some(data.as_bytes()[..split].to_vec())
1555}
1556
1557/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1558/// longest such slice that is still a valid UTF-8 string while being less than `length`
1559/// bytes and non-empty. Returns `None` if no such transformation is possible.
1560///
1561/// The caller guarantees that data.len() > length.
1562fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1563    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1564    let lower_bound = length.saturating_sub(3);
1565    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1566    increment_utf8(data.get(..split)?)
1567}
1568
1569/// Increment the final character in a UTF-8 string in such a way that the returned result
1570/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1571/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1572/// Returns `None` if the string cannot be incremented.
1573///
1574/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1575fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1576    for (idx, original_char) in data.char_indices().rev() {
1577        let original_len = original_char.len_utf8();
1578        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1579            // do not allow increasing byte width of incremented char
1580            if next_char.len_utf8() == original_len {
1581                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1582                next_char.encode_utf8(&mut result[idx..]);
1583                return Some(result);
1584            }
1585        }
1586    }
1587
1588    None
1589}
1590
1591/// Try and increment the bytes from right to left.
1592///
1593/// Returns `None` if all bytes are set to `u8::MAX`.
1594fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1595    for byte in data.iter_mut().rev() {
1596        let (incremented, overflow) = byte.overflowing_add(1);
1597        *byte = incremented;
1598
1599        if !overflow {
1600            return Some(data);
1601        }
1602    }
1603
1604    None
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609    use crate::{
1610        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1611        schema::parser::parse_message_type,
1612    };
1613    use core::str;
1614    use rand::distr::uniform::SampleUniform;
1615    use std::{fs::File, sync::Arc};
1616
1617    use crate::column::{
1618        page::PageReader,
1619        reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1620    };
1621    use crate::file::writer::TrackedWrite;
1622    use crate::file::{
1623        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1624    };
1625    use crate::schema::types::{ColumnPath, Type as SchemaType};
1626    use crate::util::test_common::rand_gen::random_numbers_range;
1627
1628    use super::*;
1629
1630    #[test]
1631    fn test_column_writer_inconsistent_def_rep_length() {
1632        let page_writer = get_test_page_writer();
1633        let props = Default::default();
1634        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1635        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1636        assert!(res.is_err());
1637        if let Err(err) = res {
1638            assert_eq!(
1639                format!("{err}"),
1640                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1641            );
1642        }
1643    }
1644
1645    #[test]
1646    fn test_column_writer_invalid_def_levels() {
1647        let page_writer = get_test_page_writer();
1648        let props = Default::default();
1649        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1650        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1651        assert!(res.is_err());
1652        if let Err(err) = res {
1653            assert_eq!(
1654                format!("{err}"),
1655                "Parquet error: Definition levels are required, because max definition level = 1"
1656            );
1657        }
1658    }
1659
1660    #[test]
1661    fn test_column_writer_invalid_rep_levels() {
1662        let page_writer = get_test_page_writer();
1663        let props = Default::default();
1664        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1665        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1666        assert!(res.is_err());
1667        if let Err(err) = res {
1668            assert_eq!(
1669                format!("{err}"),
1670                "Parquet error: Repetition levels are required, because max repetition level = 1"
1671            );
1672        }
1673    }
1674
1675    #[test]
1676    fn test_column_writer_not_enough_values_to_write() {
1677        let page_writer = get_test_page_writer();
1678        let props = Default::default();
1679        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1680        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1681        assert!(res.is_err());
1682        if let Err(err) = res {
1683            assert_eq!(
1684                format!("{err}"),
1685                "Parquet error: Expected to write 4 values, but have only 2"
1686            );
1687        }
1688    }
1689
1690    #[test]
1691    fn test_column_writer_write_only_one_dictionary_page() {
1692        let page_writer = get_test_page_writer();
1693        let props = Default::default();
1694        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1695        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1696        // First page should be correctly written.
1697        writer.add_data_page().unwrap();
1698        writer.write_dictionary_page().unwrap();
1699        let err = writer.write_dictionary_page().unwrap_err().to_string();
1700        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1701    }
1702
1703    #[test]
1704    fn test_column_writer_error_when_writing_disabled_dictionary() {
1705        let page_writer = get_test_page_writer();
1706        let props = Arc::new(
1707            WriterProperties::builder()
1708                .set_dictionary_enabled(false)
1709                .build(),
1710        );
1711        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1712        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1713        let err = writer.write_dictionary_page().unwrap_err().to_string();
1714        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1715    }
1716
1717    #[test]
1718    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1719        let page_writer = get_test_page_writer();
1720        let props = Arc::new(
1721            WriterProperties::builder()
1722                .set_dictionary_enabled(true)
1723                .build(),
1724        );
1725        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1726        writer
1727            .write_batch(&[true, false, true, false], None, None)
1728            .unwrap();
1729
1730        let r = writer.close().unwrap();
1731        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1732        // byte.
1733        assert_eq!(r.bytes_written, 1);
1734        assert_eq!(r.rows_written, 4);
1735
1736        let metadata = r.metadata;
1737        assert_eq!(
1738            metadata.encodings().collect::<Vec<_>>(),
1739            vec![Encoding::PLAIN, Encoding::RLE]
1740        );
1741        assert_eq!(metadata.num_values(), 4); // just values
1742        assert_eq!(metadata.dictionary_page_offset(), None);
1743    }
1744
1745    #[test]
1746    fn test_column_writer_default_encoding_support_bool() {
1747        check_encoding_write_support::<BoolType>(
1748            WriterVersion::PARQUET_1_0,
1749            true,
1750            &[true, false],
1751            None,
1752            &[Encoding::PLAIN, Encoding::RLE],
1753            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1754        );
1755        check_encoding_write_support::<BoolType>(
1756            WriterVersion::PARQUET_1_0,
1757            false,
1758            &[true, false],
1759            None,
1760            &[Encoding::PLAIN, Encoding::RLE],
1761            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1762        );
1763        check_encoding_write_support::<BoolType>(
1764            WriterVersion::PARQUET_2_0,
1765            true,
1766            &[true, false],
1767            None,
1768            &[Encoding::RLE],
1769            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1770        );
1771        check_encoding_write_support::<BoolType>(
1772            WriterVersion::PARQUET_2_0,
1773            false,
1774            &[true, false],
1775            None,
1776            &[Encoding::RLE],
1777            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1778        );
1779    }
1780
1781    #[test]
1782    fn test_column_writer_default_encoding_support_int32() {
1783        check_encoding_write_support::<Int32Type>(
1784            WriterVersion::PARQUET_1_0,
1785            true,
1786            &[1, 2],
1787            Some(0),
1788            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1789            &[
1790                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1791                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1792            ],
1793        );
1794        check_encoding_write_support::<Int32Type>(
1795            WriterVersion::PARQUET_1_0,
1796            false,
1797            &[1, 2],
1798            None,
1799            &[Encoding::PLAIN, Encoding::RLE],
1800            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1801        );
1802        check_encoding_write_support::<Int32Type>(
1803            WriterVersion::PARQUET_2_0,
1804            true,
1805            &[1, 2],
1806            Some(0),
1807            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1808            &[
1809                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1810                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1811            ],
1812        );
1813        check_encoding_write_support::<Int32Type>(
1814            WriterVersion::PARQUET_2_0,
1815            false,
1816            &[1, 2],
1817            None,
1818            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1819            &[encoding_stats(
1820                PageType::DATA_PAGE_V2,
1821                Encoding::DELTA_BINARY_PACKED,
1822                1,
1823            )],
1824        );
1825    }
1826
1827    #[test]
1828    fn test_column_writer_default_encoding_support_int64() {
1829        check_encoding_write_support::<Int64Type>(
1830            WriterVersion::PARQUET_1_0,
1831            true,
1832            &[1, 2],
1833            Some(0),
1834            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1835            &[
1836                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1837                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1838            ],
1839        );
1840        check_encoding_write_support::<Int64Type>(
1841            WriterVersion::PARQUET_1_0,
1842            false,
1843            &[1, 2],
1844            None,
1845            &[Encoding::PLAIN, Encoding::RLE],
1846            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1847        );
1848        check_encoding_write_support::<Int64Type>(
1849            WriterVersion::PARQUET_2_0,
1850            true,
1851            &[1, 2],
1852            Some(0),
1853            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1854            &[
1855                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1856                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1857            ],
1858        );
1859        check_encoding_write_support::<Int64Type>(
1860            WriterVersion::PARQUET_2_0,
1861            false,
1862            &[1, 2],
1863            None,
1864            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1865            &[encoding_stats(
1866                PageType::DATA_PAGE_V2,
1867                Encoding::DELTA_BINARY_PACKED,
1868                1,
1869            )],
1870        );
1871    }
1872
1873    #[test]
1874    fn test_column_writer_default_encoding_support_int96() {
1875        check_encoding_write_support::<Int96Type>(
1876            WriterVersion::PARQUET_1_0,
1877            true,
1878            &[Int96::from(vec![1, 2, 3])],
1879            Some(0),
1880            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1881            &[
1882                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1883                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1884            ],
1885        );
1886        check_encoding_write_support::<Int96Type>(
1887            WriterVersion::PARQUET_1_0,
1888            false,
1889            &[Int96::from(vec![1, 2, 3])],
1890            None,
1891            &[Encoding::PLAIN, Encoding::RLE],
1892            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1893        );
1894        check_encoding_write_support::<Int96Type>(
1895            WriterVersion::PARQUET_2_0,
1896            true,
1897            &[Int96::from(vec![1, 2, 3])],
1898            Some(0),
1899            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1900            &[
1901                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1902                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1903            ],
1904        );
1905        check_encoding_write_support::<Int96Type>(
1906            WriterVersion::PARQUET_2_0,
1907            false,
1908            &[Int96::from(vec![1, 2, 3])],
1909            None,
1910            &[Encoding::PLAIN, Encoding::RLE],
1911            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1912        );
1913    }
1914
1915    #[test]
1916    fn test_column_writer_default_encoding_support_float() {
1917        check_encoding_write_support::<FloatType>(
1918            WriterVersion::PARQUET_1_0,
1919            true,
1920            &[1.0, 2.0],
1921            Some(0),
1922            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1923            &[
1924                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1925                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1926            ],
1927        );
1928        check_encoding_write_support::<FloatType>(
1929            WriterVersion::PARQUET_1_0,
1930            false,
1931            &[1.0, 2.0],
1932            None,
1933            &[Encoding::PLAIN, Encoding::RLE],
1934            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1935        );
1936        check_encoding_write_support::<FloatType>(
1937            WriterVersion::PARQUET_2_0,
1938            true,
1939            &[1.0, 2.0],
1940            Some(0),
1941            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1942            &[
1943                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1944                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1945            ],
1946        );
1947        check_encoding_write_support::<FloatType>(
1948            WriterVersion::PARQUET_2_0,
1949            false,
1950            &[1.0, 2.0],
1951            None,
1952            &[Encoding::PLAIN, Encoding::RLE],
1953            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1954        );
1955    }
1956
1957    #[test]
1958    fn test_column_writer_default_encoding_support_double() {
1959        check_encoding_write_support::<DoubleType>(
1960            WriterVersion::PARQUET_1_0,
1961            true,
1962            &[1.0, 2.0],
1963            Some(0),
1964            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1965            &[
1966                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1967                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1968            ],
1969        );
1970        check_encoding_write_support::<DoubleType>(
1971            WriterVersion::PARQUET_1_0,
1972            false,
1973            &[1.0, 2.0],
1974            None,
1975            &[Encoding::PLAIN, Encoding::RLE],
1976            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1977        );
1978        check_encoding_write_support::<DoubleType>(
1979            WriterVersion::PARQUET_2_0,
1980            true,
1981            &[1.0, 2.0],
1982            Some(0),
1983            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1984            &[
1985                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1986                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1987            ],
1988        );
1989        check_encoding_write_support::<DoubleType>(
1990            WriterVersion::PARQUET_2_0,
1991            false,
1992            &[1.0, 2.0],
1993            None,
1994            &[Encoding::PLAIN, Encoding::RLE],
1995            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1996        );
1997    }
1998
1999    #[test]
2000    fn test_column_writer_default_encoding_support_byte_array() {
2001        check_encoding_write_support::<ByteArrayType>(
2002            WriterVersion::PARQUET_1_0,
2003            true,
2004            &[ByteArray::from(vec![1u8])],
2005            Some(0),
2006            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2007            &[
2008                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2009                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2010            ],
2011        );
2012        check_encoding_write_support::<ByteArrayType>(
2013            WriterVersion::PARQUET_1_0,
2014            false,
2015            &[ByteArray::from(vec![1u8])],
2016            None,
2017            &[Encoding::PLAIN, Encoding::RLE],
2018            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2019        );
2020        check_encoding_write_support::<ByteArrayType>(
2021            WriterVersion::PARQUET_2_0,
2022            true,
2023            &[ByteArray::from(vec![1u8])],
2024            Some(0),
2025            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2026            &[
2027                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2028                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2029            ],
2030        );
2031        check_encoding_write_support::<ByteArrayType>(
2032            WriterVersion::PARQUET_2_0,
2033            false,
2034            &[ByteArray::from(vec![1u8])],
2035            None,
2036            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2037            &[encoding_stats(
2038                PageType::DATA_PAGE_V2,
2039                Encoding::DELTA_BYTE_ARRAY,
2040                1,
2041            )],
2042        );
2043    }
2044
2045    #[test]
2046    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2047        check_encoding_write_support::<FixedLenByteArrayType>(
2048            WriterVersion::PARQUET_1_0,
2049            true,
2050            &[ByteArray::from(vec![1u8]).into()],
2051            None,
2052            &[Encoding::PLAIN, Encoding::RLE],
2053            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2054        );
2055        check_encoding_write_support::<FixedLenByteArrayType>(
2056            WriterVersion::PARQUET_1_0,
2057            false,
2058            &[ByteArray::from(vec![1u8]).into()],
2059            None,
2060            &[Encoding::PLAIN, Encoding::RLE],
2061            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2062        );
2063        check_encoding_write_support::<FixedLenByteArrayType>(
2064            WriterVersion::PARQUET_2_0,
2065            true,
2066            &[ByteArray::from(vec![1u8]).into()],
2067            Some(0),
2068            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2069            &[
2070                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2071                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2072            ],
2073        );
2074        check_encoding_write_support::<FixedLenByteArrayType>(
2075            WriterVersion::PARQUET_2_0,
2076            false,
2077            &[ByteArray::from(vec![1u8]).into()],
2078            None,
2079            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2080            &[encoding_stats(
2081                PageType::DATA_PAGE_V2,
2082                Encoding::DELTA_BYTE_ARRAY,
2083                1,
2084            )],
2085        );
2086    }
2087
2088    #[test]
2089    fn test_column_writer_check_metadata() {
2090        let page_writer = get_test_page_writer();
2091        let props = Default::default();
2092        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2093        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2094
2095        let r = writer.close().unwrap();
2096        assert_eq!(r.bytes_written, 20);
2097        assert_eq!(r.rows_written, 4);
2098
2099        let metadata = r.metadata;
2100        assert_eq!(
2101            metadata.encodings().collect::<Vec<_>>(),
2102            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2103        );
2104        assert_eq!(metadata.num_values(), 4);
2105        assert_eq!(metadata.compressed_size(), 20);
2106        assert_eq!(metadata.uncompressed_size(), 20);
2107        assert_eq!(metadata.data_page_offset(), 0);
2108        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2109        if let Some(stats) = metadata.statistics() {
2110            assert_eq!(stats.null_count_opt(), Some(0));
2111            assert_eq!(stats.distinct_count_opt(), None);
2112            if let Statistics::Int32(stats) = stats {
2113                assert_eq!(stats.min_opt().unwrap(), &1);
2114                assert_eq!(stats.max_opt().unwrap(), &4);
2115            } else {
2116                panic!("expecting Statistics::Int32");
2117            }
2118        } else {
2119            panic!("metadata missing statistics");
2120        }
2121    }
2122
2123    #[test]
2124    fn test_column_writer_check_byte_array_min_max() {
2125        let page_writer = get_test_page_writer();
2126        let props = Default::default();
2127        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2128        writer
2129            .write_batch(
2130                &[
2131                    ByteArray::from(vec![
2132                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2133                        35u8, 231u8, 90u8, 0u8, 0u8,
2134                    ]),
2135                    ByteArray::from(vec![
2136                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2137                        152u8, 177u8, 56u8, 0u8, 0u8,
2138                    ]),
2139                    ByteArray::from(vec![
2140                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2141                        0u8,
2142                    ]),
2143                    ByteArray::from(vec![
2144                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2145                        44u8, 0u8, 0u8,
2146                    ]),
2147                ],
2148                None,
2149                None,
2150            )
2151            .unwrap();
2152        let metadata = writer.close().unwrap().metadata;
2153        if let Some(stats) = metadata.statistics() {
2154            if let Statistics::ByteArray(stats) = stats {
2155                assert_eq!(
2156                    stats.min_opt().unwrap(),
2157                    &ByteArray::from(vec![
2158                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2159                        35u8, 231u8, 90u8, 0u8, 0u8,
2160                    ])
2161                );
2162                assert_eq!(
2163                    stats.max_opt().unwrap(),
2164                    &ByteArray::from(vec![
2165                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2166                        44u8, 0u8, 0u8,
2167                    ])
2168                );
2169            } else {
2170                panic!("expecting Statistics::ByteArray");
2171            }
2172        } else {
2173            panic!("metadata missing statistics");
2174        }
2175    }
2176
2177    #[test]
2178    fn test_column_writer_uint32_converted_type_min_max() {
2179        let page_writer = get_test_page_writer();
2180        let props = Default::default();
2181        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2182            page_writer,
2183            0,
2184            0,
2185            props,
2186        );
2187        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2188        let metadata = writer.close().unwrap().metadata;
2189        if let Some(stats) = metadata.statistics() {
2190            if let Statistics::Int32(stats) = stats {
2191                assert_eq!(stats.min_opt().unwrap(), &0,);
2192                assert_eq!(stats.max_opt().unwrap(), &5,);
2193            } else {
2194                panic!("expecting Statistics::Int32");
2195            }
2196        } else {
2197            panic!("metadata missing statistics");
2198        }
2199    }
2200
2201    #[test]
2202    fn test_column_writer_precalculated_statistics() {
2203        let page_writer = get_test_page_writer();
2204        let props = Arc::new(
2205            WriterProperties::builder()
2206                .set_statistics_enabled(EnabledStatistics::Chunk)
2207                .build(),
2208        );
2209        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2210        writer
2211            .write_batch_with_statistics(
2212                &[1, 2, 3, 4],
2213                None,
2214                None,
2215                Some(&-17),
2216                Some(&9000),
2217                Some(55),
2218            )
2219            .unwrap();
2220
2221        let r = writer.close().unwrap();
2222        assert_eq!(r.bytes_written, 20);
2223        assert_eq!(r.rows_written, 4);
2224
2225        let metadata = r.metadata;
2226        assert_eq!(
2227            metadata.encodings().collect::<Vec<_>>(),
2228            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2229        );
2230        assert_eq!(metadata.num_values(), 4);
2231        assert_eq!(metadata.compressed_size(), 20);
2232        assert_eq!(metadata.uncompressed_size(), 20);
2233        assert_eq!(metadata.data_page_offset(), 0);
2234        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2235        if let Some(stats) = metadata.statistics() {
2236            assert_eq!(stats.null_count_opt(), Some(0));
2237            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2238            if let Statistics::Int32(stats) = stats {
2239                assert_eq!(stats.min_opt().unwrap(), &-17);
2240                assert_eq!(stats.max_opt().unwrap(), &9000);
2241            } else {
2242                panic!("expecting Statistics::Int32");
2243            }
2244        } else {
2245            panic!("metadata missing statistics");
2246        }
2247    }
2248
2249    #[test]
2250    fn test_mixed_precomputed_statistics() {
2251        let mut buf = Vec::with_capacity(100);
2252        let mut write = TrackedWrite::new(&mut buf);
2253        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2254        let props = Arc::new(
2255            WriterProperties::builder()
2256                .set_write_page_header_statistics(true)
2257                .build(),
2258        );
2259        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2260
2261        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2262        writer
2263            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2264            .unwrap();
2265
2266        let r = writer.close().unwrap();
2267
2268        let stats = r.metadata.statistics().unwrap();
2269        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2270        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2271        assert_eq!(stats.null_count_opt(), Some(0));
2272        assert!(stats.distinct_count_opt().is_none());
2273
2274        drop(write);
2275
2276        let props = ReaderProperties::builder()
2277            .set_backward_compatible_lz4(false)
2278            .set_read_page_statistics(true)
2279            .build();
2280        let reader = SerializedPageReader::new_with_properties(
2281            Arc::new(Bytes::from(buf)),
2282            &r.metadata,
2283            r.rows_written as usize,
2284            None,
2285            Arc::new(props),
2286        )
2287        .unwrap();
2288
2289        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2290        assert_eq!(pages.len(), 2);
2291
2292        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2293        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2294
2295        let page_statistics = pages[1].statistics().unwrap();
2296        assert_eq!(
2297            page_statistics.min_bytes_opt().unwrap(),
2298            1_i32.to_le_bytes()
2299        );
2300        assert_eq!(
2301            page_statistics.max_bytes_opt().unwrap(),
2302            7_i32.to_le_bytes()
2303        );
2304        assert_eq!(page_statistics.null_count_opt(), Some(0));
2305        assert!(page_statistics.distinct_count_opt().is_none());
2306    }
2307
2308    #[test]
2309    fn test_disabled_statistics() {
2310        let mut buf = Vec::with_capacity(100);
2311        let mut write = TrackedWrite::new(&mut buf);
2312        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2313        let props = WriterProperties::builder()
2314            .set_statistics_enabled(EnabledStatistics::None)
2315            .set_writer_version(WriterVersion::PARQUET_2_0)
2316            .build();
2317        let props = Arc::new(props);
2318
2319        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2320        writer
2321            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2322            .unwrap();
2323
2324        let r = writer.close().unwrap();
2325        assert!(r.metadata.statistics().is_none());
2326
2327        drop(write);
2328
2329        let props = ReaderProperties::builder()
2330            .set_backward_compatible_lz4(false)
2331            .build();
2332        let reader = SerializedPageReader::new_with_properties(
2333            Arc::new(Bytes::from(buf)),
2334            &r.metadata,
2335            r.rows_written as usize,
2336            None,
2337            Arc::new(props),
2338        )
2339        .unwrap();
2340
2341        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2342        assert_eq!(pages.len(), 2);
2343
2344        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2345        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2346
2347        match &pages[1] {
2348            Page::DataPageV2 {
2349                num_values,
2350                num_nulls,
2351                num_rows,
2352                statistics,
2353                ..
2354            } => {
2355                assert_eq!(*num_values, 6);
2356                assert_eq!(*num_nulls, 2);
2357                assert_eq!(*num_rows, 6);
2358                assert!(statistics.is_none());
2359            }
2360            _ => unreachable!(),
2361        }
2362    }
2363
2364    #[test]
2365    fn test_column_writer_empty_column_roundtrip() {
2366        let props = Default::default();
2367        column_roundtrip::<Int32Type>(props, &[], None, None);
2368    }
2369
2370    #[test]
2371    fn test_column_writer_non_nullable_values_roundtrip() {
2372        let props = Default::default();
2373        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2374    }
2375
2376    #[test]
2377    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2378        let props = Default::default();
2379        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2380    }
2381
2382    #[test]
2383    fn test_column_writer_nullable_repeated_values_roundtrip() {
2384        let props = Default::default();
2385        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2386    }
2387
2388    #[test]
2389    fn test_column_writer_dictionary_fallback_small_data_page() {
2390        let props = WriterProperties::builder()
2391            .set_dictionary_page_size_limit(32)
2392            .set_data_page_size_limit(32)
2393            .build();
2394        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2395    }
2396
2397    #[test]
2398    fn test_column_writer_small_write_batch_size() {
2399        for i in &[1usize, 2, 5, 10, 11, 1023] {
2400            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2401
2402            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2403        }
2404    }
2405
2406    #[test]
2407    fn test_column_writer_dictionary_disabled_v1() {
2408        let props = WriterProperties::builder()
2409            .set_writer_version(WriterVersion::PARQUET_1_0)
2410            .set_dictionary_enabled(false)
2411            .build();
2412        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2413    }
2414
2415    #[test]
2416    fn test_column_writer_dictionary_disabled_v2() {
2417        let props = WriterProperties::builder()
2418            .set_writer_version(WriterVersion::PARQUET_2_0)
2419            .set_dictionary_enabled(false)
2420            .build();
2421        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2422    }
2423
2424    #[test]
2425    fn test_column_writer_compression_v1() {
2426        let props = WriterProperties::builder()
2427            .set_writer_version(WriterVersion::PARQUET_1_0)
2428            .set_compression(Compression::SNAPPY)
2429            .build();
2430        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2431    }
2432
2433    #[test]
2434    fn test_column_writer_compression_v2() {
2435        let props = WriterProperties::builder()
2436            .set_writer_version(WriterVersion::PARQUET_2_0)
2437            .set_compression(Compression::SNAPPY)
2438            .build();
2439        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2440    }
2441
2442    #[test]
2443    fn test_column_writer_add_data_pages_with_dict() {
2444        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2445        // and no fallback occurred so far.
2446        let mut file = tempfile::tempfile().unwrap();
2447        let mut write = TrackedWrite::new(&mut file);
2448        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2449        let props = Arc::new(
2450            WriterProperties::builder()
2451                .set_data_page_size_limit(10)
2452                .set_write_batch_size(3) // write 3 values at a time
2453                .build(),
2454        );
2455        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2456        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2457        writer.write_batch(data, None, None).unwrap();
2458        let r = writer.close().unwrap();
2459
2460        drop(write);
2461
2462        // Read pages and check the sequence
2463        let props = ReaderProperties::builder()
2464            .set_backward_compatible_lz4(false)
2465            .build();
2466        let mut page_reader = Box::new(
2467            SerializedPageReader::new_with_properties(
2468                Arc::new(file),
2469                &r.metadata,
2470                r.rows_written as usize,
2471                None,
2472                Arc::new(props),
2473            )
2474            .unwrap(),
2475        );
2476        let mut res = Vec::new();
2477        while let Some(page) = page_reader.get_next_page().unwrap() {
2478            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2479        }
2480        assert_eq!(
2481            res,
2482            vec![
2483                (PageType::DICTIONARY_PAGE, 10, 40),
2484                (PageType::DATA_PAGE, 9, 10),
2485                (PageType::DATA_PAGE, 1, 3),
2486            ]
2487        );
2488        assert_eq!(
2489            r.metadata.page_encoding_stats(),
2490            Some(&vec![
2491                PageEncodingStats {
2492                    page_type: PageType::DICTIONARY_PAGE,
2493                    encoding: Encoding::PLAIN,
2494                    count: 1
2495                },
2496                PageEncodingStats {
2497                    page_type: PageType::DATA_PAGE,
2498                    encoding: Encoding::RLE_DICTIONARY,
2499                    count: 2,
2500                }
2501            ])
2502        );
2503    }
2504
2505    #[test]
2506    fn test_bool_statistics() {
2507        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2508        // Booleans have an unsigned sort order and so are not compatible
2509        // with the deprecated `min` and `max` statistics
2510        assert!(!stats.is_min_max_backwards_compatible());
2511        if let Statistics::Boolean(stats) = stats {
2512            assert_eq!(stats.min_opt().unwrap(), &false);
2513            assert_eq!(stats.max_opt().unwrap(), &true);
2514        } else {
2515            panic!("expecting Statistics::Boolean, got {stats:?}");
2516        }
2517    }
2518
2519    #[test]
2520    fn test_int32_statistics() {
2521        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2522        assert!(stats.is_min_max_backwards_compatible());
2523        if let Statistics::Int32(stats) = stats {
2524            assert_eq!(stats.min_opt().unwrap(), &-2);
2525            assert_eq!(stats.max_opt().unwrap(), &3);
2526        } else {
2527            panic!("expecting Statistics::Int32, got {stats:?}");
2528        }
2529    }
2530
2531    #[test]
2532    fn test_int64_statistics() {
2533        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2534        assert!(stats.is_min_max_backwards_compatible());
2535        if let Statistics::Int64(stats) = stats {
2536            assert_eq!(stats.min_opt().unwrap(), &-2);
2537            assert_eq!(stats.max_opt().unwrap(), &3);
2538        } else {
2539            panic!("expecting Statistics::Int64, got {stats:?}");
2540        }
2541    }
2542
2543    #[test]
2544    fn test_int96_statistics() {
2545        let input = vec![
2546            Int96::from(vec![1, 20, 30]),
2547            Int96::from(vec![3, 20, 10]),
2548            Int96::from(vec![0, 20, 30]),
2549            Int96::from(vec![2, 20, 30]),
2550        ]
2551        .into_iter()
2552        .collect::<Vec<Int96>>();
2553
2554        let stats = statistics_roundtrip::<Int96Type>(&input);
2555        assert!(!stats.is_min_max_backwards_compatible());
2556        if let Statistics::Int96(stats) = stats {
2557            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2558            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2559        } else {
2560            panic!("expecting Statistics::Int96, got {stats:?}");
2561        }
2562    }
2563
2564    #[test]
2565    fn test_float_statistics() {
2566        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2567        assert!(stats.is_min_max_backwards_compatible());
2568        if let Statistics::Float(stats) = stats {
2569            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2570            assert_eq!(stats.max_opt().unwrap(), &3.0);
2571        } else {
2572            panic!("expecting Statistics::Float, got {stats:?}");
2573        }
2574    }
2575
2576    #[test]
2577    fn test_double_statistics() {
2578        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2579        assert!(stats.is_min_max_backwards_compatible());
2580        if let Statistics::Double(stats) = stats {
2581            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2582            assert_eq!(stats.max_opt().unwrap(), &3.0);
2583        } else {
2584            panic!("expecting Statistics::Double, got {stats:?}");
2585        }
2586    }
2587
2588    #[test]
2589    fn test_byte_array_statistics() {
2590        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2591            .iter()
2592            .map(|&s| s.into())
2593            .collect::<Vec<_>>();
2594
2595        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2596        assert!(!stats.is_min_max_backwards_compatible());
2597        if let Statistics::ByteArray(stats) = stats {
2598            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2599            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2600        } else {
2601            panic!("expecting Statistics::ByteArray, got {stats:?}");
2602        }
2603    }
2604
2605    #[test]
2606    fn test_fixed_len_byte_array_statistics() {
2607        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2608            .iter()
2609            .map(|&s| ByteArray::from(s).into())
2610            .collect::<Vec<_>>();
2611
2612        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2613        assert!(!stats.is_min_max_backwards_compatible());
2614        if let Statistics::FixedLenByteArray(stats) = stats {
2615            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2616            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2617            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2618            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2619        } else {
2620            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2621        }
2622    }
2623
2624    #[test]
2625    fn test_column_writer_check_float16_min_max() {
2626        let input = [
2627            -f16::ONE,
2628            f16::from_f32(3.0),
2629            -f16::from_f32(2.0),
2630            f16::from_f32(2.0),
2631        ]
2632        .into_iter()
2633        .map(|s| ByteArray::from(s).into())
2634        .collect::<Vec<_>>();
2635
2636        let stats = float16_statistics_roundtrip(&input);
2637        assert!(stats.is_min_max_backwards_compatible());
2638        assert_eq!(
2639            stats.min_opt().unwrap(),
2640            &ByteArray::from(-f16::from_f32(2.0))
2641        );
2642        assert_eq!(
2643            stats.max_opt().unwrap(),
2644            &ByteArray::from(f16::from_f32(3.0))
2645        );
2646    }
2647
2648    #[test]
2649    fn test_column_writer_check_float16_nan_middle() {
2650        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2651            .into_iter()
2652            .map(|s| ByteArray::from(s).into())
2653            .collect::<Vec<_>>();
2654
2655        let stats = float16_statistics_roundtrip(&input);
2656        assert!(stats.is_min_max_backwards_compatible());
2657        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2658        assert_eq!(
2659            stats.max_opt().unwrap(),
2660            &ByteArray::from(f16::ONE + f16::ONE)
2661        );
2662    }
2663
2664    #[test]
2665    fn test_float16_statistics_nan_middle() {
2666        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2667            .into_iter()
2668            .map(|s| ByteArray::from(s).into())
2669            .collect::<Vec<_>>();
2670
2671        let stats = float16_statistics_roundtrip(&input);
2672        assert!(stats.is_min_max_backwards_compatible());
2673        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2674        assert_eq!(
2675            stats.max_opt().unwrap(),
2676            &ByteArray::from(f16::ONE + f16::ONE)
2677        );
2678    }
2679
2680    #[test]
2681    fn test_float16_statistics_nan_start() {
2682        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2683            .into_iter()
2684            .map(|s| ByteArray::from(s).into())
2685            .collect::<Vec<_>>();
2686
2687        let stats = float16_statistics_roundtrip(&input);
2688        assert!(stats.is_min_max_backwards_compatible());
2689        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2690        assert_eq!(
2691            stats.max_opt().unwrap(),
2692            &ByteArray::from(f16::ONE + f16::ONE)
2693        );
2694    }
2695
2696    #[test]
2697    fn test_float16_statistics_nan_only() {
2698        let input = [f16::NAN, f16::NAN]
2699            .into_iter()
2700            .map(|s| ByteArray::from(s).into())
2701            .collect::<Vec<_>>();
2702
2703        let stats = float16_statistics_roundtrip(&input);
2704        assert!(stats.min_bytes_opt().is_none());
2705        assert!(stats.max_bytes_opt().is_none());
2706        assert!(stats.is_min_max_backwards_compatible());
2707    }
2708
2709    #[test]
2710    fn test_float16_statistics_zero_only() {
2711        let input = [f16::ZERO]
2712            .into_iter()
2713            .map(|s| ByteArray::from(s).into())
2714            .collect::<Vec<_>>();
2715
2716        let stats = float16_statistics_roundtrip(&input);
2717        assert!(stats.is_min_max_backwards_compatible());
2718        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2719        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2720    }
2721
2722    #[test]
2723    fn test_float16_statistics_neg_zero_only() {
2724        let input = [f16::NEG_ZERO]
2725            .into_iter()
2726            .map(|s| ByteArray::from(s).into())
2727            .collect::<Vec<_>>();
2728
2729        let stats = float16_statistics_roundtrip(&input);
2730        assert!(stats.is_min_max_backwards_compatible());
2731        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2732        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2733    }
2734
2735    #[test]
2736    fn test_float16_statistics_zero_min() {
2737        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2738            .into_iter()
2739            .map(|s| ByteArray::from(s).into())
2740            .collect::<Vec<_>>();
2741
2742        let stats = float16_statistics_roundtrip(&input);
2743        assert!(stats.is_min_max_backwards_compatible());
2744        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2745        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2746    }
2747
2748    #[test]
2749    fn test_float16_statistics_neg_zero_max() {
2750        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2751            .into_iter()
2752            .map(|s| ByteArray::from(s).into())
2753            .collect::<Vec<_>>();
2754
2755        let stats = float16_statistics_roundtrip(&input);
2756        assert!(stats.is_min_max_backwards_compatible());
2757        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2758        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2759    }
2760
2761    #[test]
2762    fn test_float_statistics_nan_middle() {
2763        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2764        assert!(stats.is_min_max_backwards_compatible());
2765        if let Statistics::Float(stats) = stats {
2766            assert_eq!(stats.min_opt().unwrap(), &1.0);
2767            assert_eq!(stats.max_opt().unwrap(), &2.0);
2768        } else {
2769            panic!("expecting Statistics::Float");
2770        }
2771    }
2772
2773    #[test]
2774    fn test_float_statistics_nan_start() {
2775        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2776        assert!(stats.is_min_max_backwards_compatible());
2777        if let Statistics::Float(stats) = stats {
2778            assert_eq!(stats.min_opt().unwrap(), &1.0);
2779            assert_eq!(stats.max_opt().unwrap(), &2.0);
2780        } else {
2781            panic!("expecting Statistics::Float");
2782        }
2783    }
2784
2785    #[test]
2786    fn test_float_statistics_nan_only() {
2787        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2788        assert!(stats.min_bytes_opt().is_none());
2789        assert!(stats.max_bytes_opt().is_none());
2790        assert!(stats.is_min_max_backwards_compatible());
2791        assert!(matches!(stats, Statistics::Float(_)));
2792    }
2793
2794    #[test]
2795    fn test_float_statistics_zero_only() {
2796        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2797        assert!(stats.is_min_max_backwards_compatible());
2798        if let Statistics::Float(stats) = stats {
2799            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2800            assert!(stats.min_opt().unwrap().is_sign_negative());
2801            assert_eq!(stats.max_opt().unwrap(), &0.0);
2802            assert!(stats.max_opt().unwrap().is_sign_positive());
2803        } else {
2804            panic!("expecting Statistics::Float");
2805        }
2806    }
2807
2808    #[test]
2809    fn test_float_statistics_neg_zero_only() {
2810        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2811        assert!(stats.is_min_max_backwards_compatible());
2812        if let Statistics::Float(stats) = stats {
2813            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2814            assert!(stats.min_opt().unwrap().is_sign_negative());
2815            assert_eq!(stats.max_opt().unwrap(), &0.0);
2816            assert!(stats.max_opt().unwrap().is_sign_positive());
2817        } else {
2818            panic!("expecting Statistics::Float");
2819        }
2820    }
2821
2822    #[test]
2823    fn test_float_statistics_zero_min() {
2824        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2825        assert!(stats.is_min_max_backwards_compatible());
2826        if let Statistics::Float(stats) = stats {
2827            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2828            assert!(stats.min_opt().unwrap().is_sign_negative());
2829            assert_eq!(stats.max_opt().unwrap(), &2.0);
2830        } else {
2831            panic!("expecting Statistics::Float");
2832        }
2833    }
2834
2835    #[test]
2836    fn test_float_statistics_neg_zero_max() {
2837        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2838        assert!(stats.is_min_max_backwards_compatible());
2839        if let Statistics::Float(stats) = stats {
2840            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2841            assert_eq!(stats.max_opt().unwrap(), &0.0);
2842            assert!(stats.max_opt().unwrap().is_sign_positive());
2843        } else {
2844            panic!("expecting Statistics::Float");
2845        }
2846    }
2847
2848    #[test]
2849    fn test_double_statistics_nan_middle() {
2850        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2851        assert!(stats.is_min_max_backwards_compatible());
2852        if let Statistics::Double(stats) = stats {
2853            assert_eq!(stats.min_opt().unwrap(), &1.0);
2854            assert_eq!(stats.max_opt().unwrap(), &2.0);
2855        } else {
2856            panic!("expecting Statistics::Double");
2857        }
2858    }
2859
2860    #[test]
2861    fn test_double_statistics_nan_start() {
2862        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2863        assert!(stats.is_min_max_backwards_compatible());
2864        if let Statistics::Double(stats) = stats {
2865            assert_eq!(stats.min_opt().unwrap(), &1.0);
2866            assert_eq!(stats.max_opt().unwrap(), &2.0);
2867        } else {
2868            panic!("expecting Statistics::Double");
2869        }
2870    }
2871
2872    #[test]
2873    fn test_double_statistics_nan_only() {
2874        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2875        assert!(stats.min_bytes_opt().is_none());
2876        assert!(stats.max_bytes_opt().is_none());
2877        assert!(matches!(stats, Statistics::Double(_)));
2878        assert!(stats.is_min_max_backwards_compatible());
2879    }
2880
2881    #[test]
2882    fn test_double_statistics_zero_only() {
2883        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2884        assert!(stats.is_min_max_backwards_compatible());
2885        if let Statistics::Double(stats) = stats {
2886            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2887            assert!(stats.min_opt().unwrap().is_sign_negative());
2888            assert_eq!(stats.max_opt().unwrap(), &0.0);
2889            assert!(stats.max_opt().unwrap().is_sign_positive());
2890        } else {
2891            panic!("expecting Statistics::Double");
2892        }
2893    }
2894
2895    #[test]
2896    fn test_double_statistics_neg_zero_only() {
2897        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2898        assert!(stats.is_min_max_backwards_compatible());
2899        if let Statistics::Double(stats) = stats {
2900            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2901            assert!(stats.min_opt().unwrap().is_sign_negative());
2902            assert_eq!(stats.max_opt().unwrap(), &0.0);
2903            assert!(stats.max_opt().unwrap().is_sign_positive());
2904        } else {
2905            panic!("expecting Statistics::Double");
2906        }
2907    }
2908
2909    #[test]
2910    fn test_double_statistics_zero_min() {
2911        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2912        assert!(stats.is_min_max_backwards_compatible());
2913        if let Statistics::Double(stats) = stats {
2914            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2915            assert!(stats.min_opt().unwrap().is_sign_negative());
2916            assert_eq!(stats.max_opt().unwrap(), &2.0);
2917        } else {
2918            panic!("expecting Statistics::Double");
2919        }
2920    }
2921
2922    #[test]
2923    fn test_double_statistics_neg_zero_max() {
2924        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2925        assert!(stats.is_min_max_backwards_compatible());
2926        if let Statistics::Double(stats) = stats {
2927            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2928            assert_eq!(stats.max_opt().unwrap(), &0.0);
2929            assert!(stats.max_opt().unwrap().is_sign_positive());
2930        } else {
2931            panic!("expecting Statistics::Double");
2932        }
2933    }
2934
2935    #[test]
2936    fn test_compare_greater_byte_array_decimals() {
2937        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2938        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2939        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2940        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2941        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2942        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2943        assert!(!compare_greater_byte_array_decimals(
2944            &[0u8, 1u8,],
2945            &[1u8, 0u8,],
2946        ),);
2947        assert!(!compare_greater_byte_array_decimals(
2948            &[255u8, 35u8, 0u8, 0u8,],
2949            &[0u8,],
2950        ),);
2951        assert!(compare_greater_byte_array_decimals(
2952            &[0u8,],
2953            &[255u8, 35u8, 0u8, 0u8,],
2954        ),);
2955    }
2956
2957    #[test]
2958    fn test_column_index_with_null_pages() {
2959        // write a single page of all nulls
2960        let page_writer = get_test_page_writer();
2961        let props = Default::default();
2962        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2963        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2964
2965        let r = writer.close().unwrap();
2966        assert!(r.column_index.is_some());
2967        let col_idx = r.column_index.unwrap();
2968        let col_idx = match col_idx {
2969            ColumnIndexMetaData::INT32(col_idx) => col_idx,
2970            _ => panic!("wrong stats type"),
2971        };
2972        // null_pages should be true for page 0
2973        assert!(col_idx.is_null_page(0));
2974        // min and max should be empty byte arrays
2975        assert!(col_idx.min_value(0).is_none());
2976        assert!(col_idx.max_value(0).is_none());
2977        // null_counts should be defined and be 4 for page 0
2978        assert!(col_idx.null_count(0).is_some());
2979        assert_eq!(col_idx.null_count(0), Some(4));
2980        // there is no repetition so rep histogram should be absent
2981        assert!(col_idx.repetition_level_histogram(0).is_none());
2982        // definition_level_histogram should be present and should be 0:4, 1:0
2983        assert!(col_idx.definition_level_histogram(0).is_some());
2984        assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
2985    }
2986
2987    #[test]
2988    fn test_column_offset_index_metadata() {
2989        // write data
2990        // and check the offset index and column index
2991        let page_writer = get_test_page_writer();
2992        let props = Default::default();
2993        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2994        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2995        // first page
2996        writer.flush_data_pages().unwrap();
2997        // second page
2998        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2999
3000        let r = writer.close().unwrap();
3001        let column_index = r.column_index.unwrap();
3002        let offset_index = r.offset_index.unwrap();
3003
3004        assert_eq!(8, r.rows_written);
3005
3006        // column index
3007        let column_index = match column_index {
3008            ColumnIndexMetaData::INT32(column_index) => column_index,
3009            _ => panic!("wrong stats type"),
3010        };
3011        assert_eq!(2, column_index.num_pages());
3012        assert_eq!(2, offset_index.page_locations.len());
3013        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3014        for idx in 0..2 {
3015            assert!(!column_index.is_null_page(idx));
3016            assert_eq!(0, column_index.null_count(0).unwrap());
3017        }
3018
3019        if let Some(stats) = r.metadata.statistics() {
3020            assert_eq!(stats.null_count_opt(), Some(0));
3021            assert_eq!(stats.distinct_count_opt(), None);
3022            if let Statistics::Int32(stats) = stats {
3023                // first page is [1,2,3,4]
3024                // second page is [-5,2,4,8]
3025                // note that we don't increment here, as this is a non BinaryArray type.
3026                assert_eq!(stats.min_opt(), column_index.min_value(1));
3027                assert_eq!(stats.max_opt(), column_index.max_value(1));
3028            } else {
3029                panic!("expecting Statistics::Int32");
3030            }
3031        } else {
3032            panic!("metadata missing statistics");
3033        }
3034
3035        // page location
3036        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3037        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3038    }
3039
3040    /// Verify min/max value truncation in the column index works as expected
3041    #[test]
3042    fn test_column_offset_index_metadata_truncating() {
3043        // write data
3044        // and check the offset index and column index
3045        let page_writer = get_test_page_writer();
3046        let props = WriterProperties::builder()
3047            .set_statistics_truncate_length(None) // disable column index truncation
3048            .build()
3049            .into();
3050        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3051
3052        let mut data = vec![FixedLenByteArray::default(); 3];
3053        // This is the expected min value - "aaa..."
3054        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3055        // This is the expected max value - "ZZZ..."
3056        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3057        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3058
3059        writer.write_batch(&data, None, None).unwrap();
3060
3061        writer.flush_data_pages().unwrap();
3062
3063        let r = writer.close().unwrap();
3064        let column_index = r.column_index.unwrap();
3065        let offset_index = r.offset_index.unwrap();
3066
3067        let column_index = match column_index {
3068            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3069            _ => panic!("wrong stats type"),
3070        };
3071
3072        assert_eq!(3, r.rows_written);
3073
3074        // column index
3075        assert_eq!(1, column_index.num_pages());
3076        assert_eq!(1, offset_index.page_locations.len());
3077        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3078        assert!(!column_index.is_null_page(0));
3079        assert_eq!(Some(0), column_index.null_count(0));
3080
3081        if let Some(stats) = r.metadata.statistics() {
3082            assert_eq!(stats.null_count_opt(), Some(0));
3083            assert_eq!(stats.distinct_count_opt(), None);
3084            if let Statistics::FixedLenByteArray(stats) = stats {
3085                let column_index_min_value = column_index.min_value(0).unwrap();
3086                let column_index_max_value = column_index.max_value(0).unwrap();
3087
3088                // Column index stats are truncated, while the column chunk's aren't.
3089                assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3090                assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3091
3092                assert_eq!(
3093                    column_index_min_value.len(),
3094                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3095                );
3096                assert_eq!(column_index_min_value, &[97_u8; 64]);
3097                assert_eq!(
3098                    column_index_max_value.len(),
3099                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3100                );
3101
3102                // We expect the last byte to be incremented
3103                assert_eq!(
3104                    *column_index_max_value.last().unwrap(),
3105                    *column_index_max_value.first().unwrap() + 1
3106                );
3107            } else {
3108                panic!("expecting Statistics::FixedLenByteArray");
3109            }
3110        } else {
3111            panic!("metadata missing statistics");
3112        }
3113    }
3114
3115    #[test]
3116    fn test_column_offset_index_truncating_spec_example() {
3117        // write data
3118        // and check the offset index and column index
3119        let page_writer = get_test_page_writer();
3120
3121        // Truncate values at 1 byte
3122        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3123        let props = Arc::new(builder.build());
3124        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3125
3126        let mut data = vec![FixedLenByteArray::default(); 1];
3127        // This is the expected min value
3128        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3129
3130        writer.write_batch(&data, None, None).unwrap();
3131
3132        writer.flush_data_pages().unwrap();
3133
3134        let r = writer.close().unwrap();
3135        let column_index = r.column_index.unwrap();
3136        let offset_index = r.offset_index.unwrap();
3137
3138        let column_index = match column_index {
3139            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3140            _ => panic!("wrong stats type"),
3141        };
3142
3143        assert_eq!(1, r.rows_written);
3144
3145        // column index
3146        assert_eq!(1, column_index.num_pages());
3147        assert_eq!(1, offset_index.page_locations.len());
3148        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3149        assert!(!column_index.is_null_page(0));
3150        assert_eq!(Some(0), column_index.null_count(0));
3151
3152        if let Some(stats) = r.metadata.statistics() {
3153            assert_eq!(stats.null_count_opt(), Some(0));
3154            assert_eq!(stats.distinct_count_opt(), None);
3155            if let Statistics::FixedLenByteArray(_stats) = stats {
3156                let column_index_min_value = column_index.min_value(0).unwrap();
3157                let column_index_max_value = column_index.max_value(0).unwrap();
3158
3159                assert_eq!(column_index_min_value.len(), 1);
3160                assert_eq!(column_index_max_value.len(), 1);
3161
3162                assert_eq!("B".as_bytes(), column_index_min_value);
3163                assert_eq!("C".as_bytes(), column_index_max_value);
3164
3165                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3166                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3167            } else {
3168                panic!("expecting Statistics::FixedLenByteArray");
3169            }
3170        } else {
3171            panic!("metadata missing statistics");
3172        }
3173    }
3174
3175    #[test]
3176    fn test_float16_min_max_no_truncation() {
3177        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3178        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3179        let props = Arc::new(builder.build());
3180        let page_writer = get_test_page_writer();
3181        let mut writer = get_test_float16_column_writer(page_writer, props);
3182
3183        let expected_value = f16::PI.to_le_bytes().to_vec();
3184        let data = vec![ByteArray::from(expected_value.clone()).into()];
3185        writer.write_batch(&data, None, None).unwrap();
3186        writer.flush_data_pages().unwrap();
3187
3188        let r = writer.close().unwrap();
3189
3190        // stats should still be written
3191        // ensure bytes weren't truncated for column index
3192        let column_index = r.column_index.unwrap();
3193        let column_index = match column_index {
3194            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3195            _ => panic!("wrong stats type"),
3196        };
3197        let column_index_min_bytes = column_index.min_value(0).unwrap();
3198        let column_index_max_bytes = column_index.max_value(0).unwrap();
3199        assert_eq!(expected_value, column_index_min_bytes);
3200        assert_eq!(expected_value, column_index_max_bytes);
3201
3202        // ensure bytes weren't truncated for statistics
3203        let stats = r.metadata.statistics().unwrap();
3204        if let Statistics::FixedLenByteArray(stats) = stats {
3205            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3206            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3207            assert_eq!(expected_value, stats_min_bytes);
3208            assert_eq!(expected_value, stats_max_bytes);
3209        } else {
3210            panic!("expecting Statistics::FixedLenByteArray");
3211        }
3212    }
3213
3214    #[test]
3215    fn test_decimal_min_max_no_truncation() {
3216        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3217        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3218        let props = Arc::new(builder.build());
3219        let page_writer = get_test_page_writer();
3220        let mut writer =
3221            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3222
3223        let expected_value = vec![
3224            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3225            231u8, 90u8, 0u8, 0u8,
3226        ];
3227        let data = vec![ByteArray::from(expected_value.clone()).into()];
3228        writer.write_batch(&data, None, None).unwrap();
3229        writer.flush_data_pages().unwrap();
3230
3231        let r = writer.close().unwrap();
3232
3233        // stats should still be written
3234        // ensure bytes weren't truncated for column index
3235        let column_index = r.column_index.unwrap();
3236        let column_index = match column_index {
3237            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3238            _ => panic!("wrong stats type"),
3239        };
3240        let column_index_min_bytes = column_index.min_value(0).unwrap();
3241        let column_index_max_bytes = column_index.max_value(0).unwrap();
3242        assert_eq!(expected_value, column_index_min_bytes);
3243        assert_eq!(expected_value, column_index_max_bytes);
3244
3245        // ensure bytes weren't truncated for statistics
3246        let stats = r.metadata.statistics().unwrap();
3247        if let Statistics::FixedLenByteArray(stats) = stats {
3248            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3249            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3250            assert_eq!(expected_value, stats_min_bytes);
3251            assert_eq!(expected_value, stats_max_bytes);
3252        } else {
3253            panic!("expecting Statistics::FixedLenByteArray");
3254        }
3255    }
3256
3257    #[test]
3258    fn test_statistics_truncating_byte_array_default() {
3259        let page_writer = get_test_page_writer();
3260
3261        // The default truncate length is 64 bytes
3262        let props = WriterProperties::builder().build().into();
3263        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3264
3265        let mut data = vec![ByteArray::default(); 1];
3266        data[0].set_data(Bytes::from(String::from(
3267            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3268        )));
3269        writer.write_batch(&data, None, None).unwrap();
3270        writer.flush_data_pages().unwrap();
3271
3272        let r = writer.close().unwrap();
3273
3274        assert_eq!(1, r.rows_written);
3275
3276        let stats = r.metadata.statistics().expect("statistics");
3277        if let Statistics::ByteArray(_stats) = stats {
3278            let min_value = _stats.min_opt().unwrap();
3279            let max_value = _stats.max_opt().unwrap();
3280
3281            assert!(!_stats.min_is_exact());
3282            assert!(!_stats.max_is_exact());
3283
3284            let expected_len = 64;
3285            assert_eq!(min_value.len(), expected_len);
3286            assert_eq!(max_value.len(), expected_len);
3287
3288            let expected_min =
3289                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3290            assert_eq!(expected_min, min_value.as_bytes());
3291            // note the max value is different from the min value: the last byte is incremented
3292            let expected_max =
3293                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3294            assert_eq!(expected_max, max_value.as_bytes());
3295        } else {
3296            panic!("expecting Statistics::ByteArray");
3297        }
3298    }
3299
3300    #[test]
3301    fn test_statistics_truncating_byte_array() {
3302        let page_writer = get_test_page_writer();
3303
3304        const TEST_TRUNCATE_LENGTH: usize = 1;
3305
3306        // Truncate values at 1 byte
3307        let builder =
3308            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3309        let props = Arc::new(builder.build());
3310        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3311
3312        let mut data = vec![ByteArray::default(); 1];
3313        // This is the expected min value
3314        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3315
3316        writer.write_batch(&data, None, None).unwrap();
3317
3318        writer.flush_data_pages().unwrap();
3319
3320        let r = writer.close().unwrap();
3321
3322        assert_eq!(1, r.rows_written);
3323
3324        let stats = r.metadata.statistics().expect("statistics");
3325        assert_eq!(stats.null_count_opt(), Some(0));
3326        assert_eq!(stats.distinct_count_opt(), None);
3327        if let Statistics::ByteArray(_stats) = stats {
3328            let min_value = _stats.min_opt().unwrap();
3329            let max_value = _stats.max_opt().unwrap();
3330
3331            assert!(!_stats.min_is_exact());
3332            assert!(!_stats.max_is_exact());
3333
3334            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3335            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3336
3337            assert_eq!("B".as_bytes(), min_value.as_bytes());
3338            assert_eq!("C".as_bytes(), max_value.as_bytes());
3339        } else {
3340            panic!("expecting Statistics::ByteArray");
3341        }
3342    }
3343
3344    #[test]
3345    fn test_statistics_truncating_fixed_len_byte_array() {
3346        let page_writer = get_test_page_writer();
3347
3348        const TEST_TRUNCATE_LENGTH: usize = 1;
3349
3350        // Truncate values at 1 byte
3351        let builder =
3352            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3353        let props = Arc::new(builder.build());
3354        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3355
3356        let mut data = vec![FixedLenByteArray::default(); 1];
3357
3358        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3359        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3360
3361        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3362        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3363            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3364
3365        // This is the expected min value
3366        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3367
3368        writer.write_batch(&data, None, None).unwrap();
3369
3370        writer.flush_data_pages().unwrap();
3371
3372        let r = writer.close().unwrap();
3373
3374        assert_eq!(1, r.rows_written);
3375
3376        let stats = r.metadata.statistics().expect("statistics");
3377        assert_eq!(stats.null_count_opt(), Some(0));
3378        assert_eq!(stats.distinct_count_opt(), None);
3379        if let Statistics::FixedLenByteArray(_stats) = stats {
3380            let min_value = _stats.min_opt().unwrap();
3381            let max_value = _stats.max_opt().unwrap();
3382
3383            assert!(!_stats.min_is_exact());
3384            assert!(!_stats.max_is_exact());
3385
3386            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3387            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3388
3389            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3390            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3391
3392            let reconstructed_min = i128::from_be_bytes([
3393                min_value.as_bytes()[0],
3394                0,
3395                0,
3396                0,
3397                0,
3398                0,
3399                0,
3400                0,
3401                0,
3402                0,
3403                0,
3404                0,
3405                0,
3406                0,
3407                0,
3408                0,
3409            ]);
3410
3411            let reconstructed_max = i128::from_be_bytes([
3412                max_value.as_bytes()[0],
3413                0,
3414                0,
3415                0,
3416                0,
3417                0,
3418                0,
3419                0,
3420                0,
3421                0,
3422                0,
3423                0,
3424                0,
3425                0,
3426                0,
3427                0,
3428            ]);
3429
3430            // check that the inner value is correctly bounded by the min/max
3431            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3432            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3433            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3434            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3435        } else {
3436            panic!("expecting Statistics::FixedLenByteArray");
3437        }
3438    }
3439
3440    #[test]
3441    fn test_send() {
3442        fn test<T: Send>() {}
3443        test::<ColumnWriterImpl<Int32Type>>();
3444    }
3445
3446    #[test]
3447    fn test_increment() {
3448        let v = increment(vec![0, 0, 0]).unwrap();
3449        assert_eq!(&v, &[0, 0, 1]);
3450
3451        // Handle overflow
3452        let v = increment(vec![0, 255, 255]).unwrap();
3453        assert_eq!(&v, &[1, 0, 0]);
3454
3455        // Return `None` if all bytes are u8::MAX
3456        let v = increment(vec![255, 255, 255]);
3457        assert!(v.is_none());
3458    }
3459
3460    #[test]
3461    fn test_increment_utf8() {
3462        let test_inc = |o: &str, expected: &str| {
3463            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3464                // Got the expected result...
3465                assert_eq!(v, expected);
3466                // and it's greater than the original string
3467                assert!(*v > *o);
3468                // Also show that BinaryArray level comparison works here
3469                let mut greater = ByteArray::new();
3470                greater.set_data(Bytes::from(v));
3471                let mut original = ByteArray::new();
3472                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3473                assert!(greater > original);
3474            } else {
3475                panic!("Expected incremented UTF8 string to also be valid.");
3476            }
3477        };
3478
3479        // Basic ASCII case
3480        test_inc("hello", "hellp");
3481
3482        // 1-byte ending in max 1-byte
3483        test_inc("a\u{7f}", "b");
3484
3485        // 1-byte max should not truncate as it would need 2-byte code points
3486        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3487
3488        // UTF8 string
3489        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3490
3491        // 2-byte without overflow
3492        test_inc("éééé", "éééê");
3493
3494        // 2-byte that overflows lowest byte
3495        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3496
3497        // 2-byte ending in max 2-byte
3498        test_inc("a\u{7ff}", "b");
3499
3500        // Max 2-byte should not truncate as it would need 3-byte code points
3501        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3502
3503        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3504        // characters should render right to left).
3505        test_inc("ࠀࠀ", "ࠀࠁ");
3506
3507        // 3-byte ending in max 3-byte
3508        test_inc("a\u{ffff}", "b");
3509
3510        // Max 3-byte should not truncate as it would need 4-byte code points
3511        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3512
3513        // 4-byte without overflow
3514        test_inc("𐀀𐀀", "𐀀𐀁");
3515
3516        // 4-byte ending in max unicode
3517        test_inc("a\u{10ffff}", "b");
3518
3519        // Max 4-byte should not truncate
3520        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3521
3522        // Skip over surrogate pair range (0xD800..=0xDFFF)
3523        //test_inc("a\u{D7FF}", "a\u{e000}");
3524        test_inc("a\u{D7FF}", "b");
3525    }
3526
3527    #[test]
3528    fn test_truncate_utf8() {
3529        // No-op
3530        let data = "❤️🧡💛💚💙💜";
3531        let r = truncate_utf8(data, data.len()).unwrap();
3532        assert_eq!(r.len(), data.len());
3533        assert_eq!(&r, data.as_bytes());
3534
3535        // We slice it away from the UTF8 boundary
3536        let r = truncate_utf8(data, 13).unwrap();
3537        assert_eq!(r.len(), 10);
3538        assert_eq!(&r, "❤️🧡".as_bytes());
3539
3540        // One multi-byte code point, and a length shorter than it, so we can't slice it
3541        let r = truncate_utf8("\u{0836}", 1);
3542        assert!(r.is_none());
3543
3544        // Test truncate and increment for max bounds on UTF-8 statistics
3545        // 7-bit (i.e. ASCII)
3546        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3547        assert_eq!(&r, "yyyyyyyz".as_bytes());
3548
3549        // 2-byte without overflow
3550        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3551        assert_eq!(&r, "ééê".as_bytes());
3552
3553        // 2-byte that overflows lowest byte
3554        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3555        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3556
3557        // max 2-byte should not truncate as it would need 3-byte code points
3558        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3559        assert!(r.is_none());
3560
3561        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3562        // characters should render right to left).
3563        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3564        assert_eq!(&r, "ࠀࠁ".as_bytes());
3565
3566        // max 3-byte should not truncate as it would need 4-byte code points
3567        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3568        assert!(r.is_none());
3569
3570        // 4-byte without overflow
3571        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3572        assert_eq!(&r, "𐀀𐀁".as_bytes());
3573
3574        // max 4-byte should not truncate
3575        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3576        assert!(r.is_none());
3577    }
3578
3579    #[test]
3580    // Check fallback truncation of statistics that should be UTF-8, but aren't
3581    // (see https://github.com/apache/arrow-rs/pull/6870).
3582    fn test_byte_array_truncate_invalid_utf8_statistics() {
3583        let message_type = "
3584            message test_schema {
3585                OPTIONAL BYTE_ARRAY a (UTF8);
3586            }
3587        ";
3588        let schema = Arc::new(parse_message_type(message_type).unwrap());
3589
3590        // Create Vec<ByteArray> containing non-UTF8 bytes
3591        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3592        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3593        let file: File = tempfile::tempfile().unwrap();
3594        let props = Arc::new(
3595            WriterProperties::builder()
3596                .set_statistics_enabled(EnabledStatistics::Chunk)
3597                .set_statistics_truncate_length(Some(8))
3598                .build(),
3599        );
3600
3601        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3602        let mut row_group_writer = writer.next_row_group().unwrap();
3603
3604        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3605        col_writer
3606            .typed::<ByteArrayType>()
3607            .write_batch(&data, Some(&def_levels), None)
3608            .unwrap();
3609        col_writer.close().unwrap();
3610        row_group_writer.close().unwrap();
3611        let file_metadata = writer.close().unwrap();
3612        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3613        assert!(!stats.max_is_exact());
3614        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3615        // be incremented by 1.
3616        assert_eq!(
3617            stats.max_bytes_opt().map(|v| v.to_vec()),
3618            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3619        );
3620    }
3621
3622    #[test]
3623    fn test_increment_max_binary_chars() {
3624        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3625        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3626
3627        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3628        assert!(incremented.is_none())
3629    }
3630
3631    #[test]
3632    fn test_no_column_index_when_stats_disabled() {
3633        // https://github.com/apache/arrow-rs/issues/6010
3634        // Test that column index is not created/written for all-nulls column when page
3635        // statistics are disabled.
3636        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3637        let props = Arc::new(
3638            WriterProperties::builder()
3639                .set_statistics_enabled(EnabledStatistics::None)
3640                .build(),
3641        );
3642        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3643        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3644
3645        let data = Vec::new();
3646        let def_levels = vec![0; 10];
3647        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3648        writer.flush_data_pages().unwrap();
3649
3650        let column_close_result = writer.close().unwrap();
3651        assert!(column_close_result.offset_index.is_some());
3652        assert!(column_close_result.column_index.is_none());
3653    }
3654
3655    #[test]
3656    fn test_no_offset_index_when_disabled() {
3657        // Test that offset indexes can be disabled
3658        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3659        let props = Arc::new(
3660            WriterProperties::builder()
3661                .set_statistics_enabled(EnabledStatistics::None)
3662                .set_offset_index_disabled(true)
3663                .build(),
3664        );
3665        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3666        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3667
3668        let data = Vec::new();
3669        let def_levels = vec![0; 10];
3670        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3671        writer.flush_data_pages().unwrap();
3672
3673        let column_close_result = writer.close().unwrap();
3674        assert!(column_close_result.offset_index.is_none());
3675        assert!(column_close_result.column_index.is_none());
3676    }
3677
3678    #[test]
3679    fn test_offset_index_overridden() {
3680        // Test that offset indexes are not disabled when gathering page statistics
3681        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3682        let props = Arc::new(
3683            WriterProperties::builder()
3684                .set_statistics_enabled(EnabledStatistics::Page)
3685                .set_offset_index_disabled(true)
3686                .build(),
3687        );
3688        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3689        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3690
3691        let data = Vec::new();
3692        let def_levels = vec![0; 10];
3693        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3694        writer.flush_data_pages().unwrap();
3695
3696        let column_close_result = writer.close().unwrap();
3697        assert!(column_close_result.offset_index.is_some());
3698        assert!(column_close_result.column_index.is_some());
3699    }
3700
3701    #[test]
3702    fn test_boundary_order() -> Result<()> {
3703        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3704        // min max both ascending
3705        let column_close_result = write_multiple_pages::<Int32Type>(
3706            &descr,
3707            &[
3708                &[Some(-10), Some(10)],
3709                &[Some(-5), Some(11)],
3710                &[None],
3711                &[Some(-5), Some(11)],
3712            ],
3713        )?;
3714        let boundary_order = column_close_result
3715            .column_index
3716            .unwrap()
3717            .get_boundary_order();
3718        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3719
3720        // min max both descending
3721        let column_close_result = write_multiple_pages::<Int32Type>(
3722            &descr,
3723            &[
3724                &[Some(10), Some(11)],
3725                &[Some(5), Some(11)],
3726                &[None],
3727                &[Some(-5), Some(0)],
3728            ],
3729        )?;
3730        let boundary_order = column_close_result
3731            .column_index
3732            .unwrap()
3733            .get_boundary_order();
3734        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3735
3736        // min max both equal
3737        let column_close_result = write_multiple_pages::<Int32Type>(
3738            &descr,
3739            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3740        )?;
3741        let boundary_order = column_close_result
3742            .column_index
3743            .unwrap()
3744            .get_boundary_order();
3745        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3746
3747        // only nulls
3748        let column_close_result =
3749            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3750        let boundary_order = column_close_result
3751            .column_index
3752            .unwrap()
3753            .get_boundary_order();
3754        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3755
3756        // one page
3757        let column_close_result =
3758            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3759        let boundary_order = column_close_result
3760            .column_index
3761            .unwrap()
3762            .get_boundary_order();
3763        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3764
3765        // one non-null page
3766        let column_close_result =
3767            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3768        let boundary_order = column_close_result
3769            .column_index
3770            .unwrap()
3771            .get_boundary_order();
3772        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3773
3774        // min max both unordered
3775        let column_close_result = write_multiple_pages::<Int32Type>(
3776            &descr,
3777            &[
3778                &[Some(10), Some(11)],
3779                &[Some(11), Some(16)],
3780                &[None],
3781                &[Some(-5), Some(0)],
3782            ],
3783        )?;
3784        let boundary_order = column_close_result
3785            .column_index
3786            .unwrap()
3787            .get_boundary_order();
3788        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3789
3790        // min max both ordered in different orders
3791        let column_close_result = write_multiple_pages::<Int32Type>(
3792            &descr,
3793            &[
3794                &[Some(1), Some(9)],
3795                &[Some(2), Some(8)],
3796                &[None],
3797                &[Some(3), Some(7)],
3798            ],
3799        )?;
3800        let boundary_order = column_close_result
3801            .column_index
3802            .unwrap()
3803            .get_boundary_order();
3804        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3805
3806        Ok(())
3807    }
3808
3809    #[test]
3810    fn test_boundary_order_logical_type() -> Result<()> {
3811        // ensure that logical types account for different sort order than underlying
3812        // physical type representation
3813        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3814        let fba_descr = {
3815            let tpe = SchemaType::primitive_type_builder(
3816                "col",
3817                FixedLenByteArrayType::get_physical_type(),
3818            )
3819            .with_length(2)
3820            .build()?;
3821            Arc::new(ColumnDescriptor::new(
3822                Arc::new(tpe),
3823                1,
3824                0,
3825                ColumnPath::from("col"),
3826            ))
3827        };
3828
3829        let values: &[&[Option<FixedLenByteArray>]] = &[
3830            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3831            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3832            &[Some(FixedLenByteArray::from(ByteArray::from(
3833                f16::NEG_ZERO,
3834            )))],
3835            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3836        ];
3837
3838        // f16 descending
3839        let column_close_result =
3840            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3841        let boundary_order = column_close_result
3842            .column_index
3843            .unwrap()
3844            .get_boundary_order();
3845        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3846
3847        // same bytes, but fba unordered
3848        let column_close_result =
3849            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3850        let boundary_order = column_close_result
3851            .column_index
3852            .unwrap()
3853            .get_boundary_order();
3854        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3855
3856        Ok(())
3857    }
3858
3859    #[test]
3860    fn test_interval_stats_should_not_have_min_max() {
3861        let input = [
3862            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3863            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3864            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3865        ]
3866        .into_iter()
3867        .map(|s| ByteArray::from(s).into())
3868        .collect::<Vec<_>>();
3869
3870        let page_writer = get_test_page_writer();
3871        let mut writer = get_test_interval_column_writer(page_writer);
3872        writer.write_batch(&input, None, None).unwrap();
3873
3874        let metadata = writer.close().unwrap().metadata;
3875        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3876            stats.clone()
3877        } else {
3878            panic!("metadata missing statistics");
3879        };
3880        assert!(stats.min_bytes_opt().is_none());
3881        assert!(stats.max_bytes_opt().is_none());
3882    }
3883
3884    #[test]
3885    #[cfg(feature = "arrow")]
3886    fn test_column_writer_get_estimated_total_bytes() {
3887        let page_writer = get_test_page_writer();
3888        let props = Default::default();
3889        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3890        assert_eq!(writer.get_estimated_total_bytes(), 0);
3891
3892        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3893        writer.add_data_page().unwrap();
3894        let size_with_one_page = writer.get_estimated_total_bytes();
3895        assert_eq!(size_with_one_page, 20);
3896
3897        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3898        writer.add_data_page().unwrap();
3899        let size_with_two_pages = writer.get_estimated_total_bytes();
3900        // different pages have different compressed lengths
3901        assert_eq!(size_with_two_pages, 20 + 21);
3902    }
3903
3904    fn write_multiple_pages<T: DataType>(
3905        column_descr: &Arc<ColumnDescriptor>,
3906        pages: &[&[Option<T::T>]],
3907    ) -> Result<ColumnCloseResult> {
3908        let column_writer = get_column_writer(
3909            column_descr.clone(),
3910            Default::default(),
3911            get_test_page_writer(),
3912        );
3913        let mut writer = get_typed_column_writer::<T>(column_writer);
3914
3915        for &page in pages {
3916            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3917            let def_levels = page
3918                .iter()
3919                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3920                .collect::<Vec<_>>();
3921            writer.write_batch(&values, Some(&def_levels), None)?;
3922            writer.flush_data_pages()?;
3923        }
3924
3925        writer.close()
3926    }
3927
3928    /// Performs write-read roundtrip with randomly generated values and levels.
3929    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3930    /// for a column.
3931    fn column_roundtrip_random<T: DataType>(
3932        props: WriterProperties,
3933        max_size: usize,
3934        min_value: T::T,
3935        max_value: T::T,
3936        max_def_level: i16,
3937        max_rep_level: i16,
3938    ) where
3939        T::T: PartialOrd + SampleUniform + Copy,
3940    {
3941        let mut num_values: usize = 0;
3942
3943        let mut buf: Vec<i16> = Vec::new();
3944        let def_levels = if max_def_level > 0 {
3945            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3946            for &dl in &buf[..] {
3947                if dl == max_def_level {
3948                    num_values += 1;
3949                }
3950            }
3951            Some(&buf[..])
3952        } else {
3953            num_values = max_size;
3954            None
3955        };
3956
3957        let mut buf: Vec<i16> = Vec::new();
3958        let rep_levels = if max_rep_level > 0 {
3959            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3960            buf[0] = 0; // Must start on record boundary
3961            Some(&buf[..])
3962        } else {
3963            None
3964        };
3965
3966        let mut values: Vec<T::T> = Vec::new();
3967        random_numbers_range(num_values, min_value, max_value, &mut values);
3968
3969        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3970    }
3971
3972    /// Performs write-read roundtrip and asserts written values and levels.
3973    fn column_roundtrip<T: DataType>(
3974        props: WriterProperties,
3975        values: &[T::T],
3976        def_levels: Option<&[i16]>,
3977        rep_levels: Option<&[i16]>,
3978    ) {
3979        let mut file = tempfile::tempfile().unwrap();
3980        let mut write = TrackedWrite::new(&mut file);
3981        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3982
3983        let max_def_level = match def_levels {
3984            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3985            None => 0i16,
3986        };
3987
3988        let max_rep_level = match rep_levels {
3989            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3990            None => 0i16,
3991        };
3992
3993        let mut max_batch_size = values.len();
3994        if let Some(levels) = def_levels {
3995            max_batch_size = max_batch_size.max(levels.len());
3996        }
3997        if let Some(levels) = rep_levels {
3998            max_batch_size = max_batch_size.max(levels.len());
3999        }
4000
4001        let mut writer =
4002            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4003
4004        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4005        assert_eq!(values_written, values.len());
4006        let result = writer.close().unwrap();
4007
4008        drop(write);
4009
4010        let props = ReaderProperties::builder()
4011            .set_backward_compatible_lz4(false)
4012            .build();
4013        let page_reader = Box::new(
4014            SerializedPageReader::new_with_properties(
4015                Arc::new(file),
4016                &result.metadata,
4017                result.rows_written as usize,
4018                None,
4019                Arc::new(props),
4020            )
4021            .unwrap(),
4022        );
4023        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4024
4025        let mut actual_values = Vec::with_capacity(max_batch_size);
4026        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4027        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4028
4029        let (_, values_read, levels_read) = reader
4030            .read_records(
4031                max_batch_size,
4032                actual_def_levels.as_mut(),
4033                actual_rep_levels.as_mut(),
4034                &mut actual_values,
4035            )
4036            .unwrap();
4037
4038        // Assert values, definition and repetition levels.
4039
4040        assert_eq!(&actual_values[..values_read], values);
4041        match actual_def_levels {
4042            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4043            None => assert_eq!(None, def_levels),
4044        }
4045        match actual_rep_levels {
4046            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4047            None => assert_eq!(None, rep_levels),
4048        }
4049
4050        // Assert written rows.
4051
4052        if let Some(levels) = actual_rep_levels {
4053            let mut actual_rows_written = 0;
4054            for l in levels {
4055                if l == 0 {
4056                    actual_rows_written += 1;
4057                }
4058            }
4059            assert_eq!(actual_rows_written, result.rows_written);
4060        } else if actual_def_levels.is_some() {
4061            assert_eq!(levels_read as u64, result.rows_written);
4062        } else {
4063            assert_eq!(values_read as u64, result.rows_written);
4064        }
4065    }
4066
4067    /// Performs write of provided values and returns column metadata of those values.
4068    /// Used to test encoding support for column writer.
4069    fn column_write_and_get_metadata<T: DataType>(
4070        props: WriterProperties,
4071        values: &[T::T],
4072    ) -> ColumnChunkMetaData {
4073        let page_writer = get_test_page_writer();
4074        let props = Arc::new(props);
4075        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4076        writer.write_batch(values, None, None).unwrap();
4077        writer.close().unwrap().metadata
4078    }
4079
4080    // Helper function to more compactly create a PageEncodingStats struct.
4081    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4082        PageEncodingStats {
4083            page_type,
4084            encoding,
4085            count,
4086        }
4087    }
4088
4089    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4090    // offset and encodings to make sure that column writer uses provided by trait
4091    // encodings.
4092    fn check_encoding_write_support<T: DataType>(
4093        version: WriterVersion,
4094        dict_enabled: bool,
4095        data: &[T::T],
4096        dictionary_page_offset: Option<i64>,
4097        encodings: &[Encoding],
4098        page_encoding_stats: &[PageEncodingStats],
4099    ) {
4100        let props = WriterProperties::builder()
4101            .set_writer_version(version)
4102            .set_dictionary_enabled(dict_enabled)
4103            .build();
4104        let meta = column_write_and_get_metadata::<T>(props, data);
4105        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4106        assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4107        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4108    }
4109
4110    /// Returns column writer.
4111    fn get_test_column_writer<'a, T: DataType>(
4112        page_writer: Box<dyn PageWriter + 'a>,
4113        max_def_level: i16,
4114        max_rep_level: i16,
4115        props: WriterPropertiesPtr,
4116    ) -> ColumnWriterImpl<'a, T> {
4117        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4118        let column_writer = get_column_writer(descr, props, page_writer);
4119        get_typed_column_writer::<T>(column_writer)
4120    }
4121
4122    /// Returns column reader.
4123    fn get_test_column_reader<T: DataType>(
4124        page_reader: Box<dyn PageReader>,
4125        max_def_level: i16,
4126        max_rep_level: i16,
4127    ) -> ColumnReaderImpl<T> {
4128        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4129        let column_reader = get_column_reader(descr, page_reader);
4130        get_typed_column_reader::<T>(column_reader)
4131    }
4132
4133    /// Returns descriptor for primitive column.
4134    fn get_test_column_descr<T: DataType>(
4135        max_def_level: i16,
4136        max_rep_level: i16,
4137    ) -> ColumnDescriptor {
4138        let path = ColumnPath::from("col");
4139        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4140            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4141            // it should be no-op for other types
4142            .with_length(1)
4143            .build()
4144            .unwrap();
4145        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4146    }
4147
4148    /// Returns page writer that collects pages without serializing them.
4149    fn get_test_page_writer() -> Box<dyn PageWriter> {
4150        Box::new(TestPageWriter {})
4151    }
4152
4153    struct TestPageWriter {}
4154
4155    impl PageWriter for TestPageWriter {
4156        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4157            let mut res = PageWriteSpec::new();
4158            res.page_type = page.page_type();
4159            res.uncompressed_size = page.uncompressed_size();
4160            res.compressed_size = page.compressed_size();
4161            res.num_values = page.num_values();
4162            res.offset = 0;
4163            res.bytes_written = page.data().len() as u64;
4164            Ok(res)
4165        }
4166
4167        fn close(&mut self) -> Result<()> {
4168            Ok(())
4169        }
4170    }
4171
4172    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4173    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4174        let page_writer = get_test_page_writer();
4175        let props = Default::default();
4176        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4177        writer.write_batch(values, None, None).unwrap();
4178
4179        let metadata = writer.close().unwrap().metadata;
4180        if let Some(stats) = metadata.statistics() {
4181            stats.clone()
4182        } else {
4183            panic!("metadata missing statistics");
4184        }
4185    }
4186
4187    /// Returns Decimals column writer.
4188    fn get_test_decimals_column_writer<T: DataType>(
4189        page_writer: Box<dyn PageWriter>,
4190        max_def_level: i16,
4191        max_rep_level: i16,
4192        props: WriterPropertiesPtr,
4193    ) -> ColumnWriterImpl<'static, T> {
4194        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4195            max_def_level,
4196            max_rep_level,
4197        ));
4198        let column_writer = get_column_writer(descr, props, page_writer);
4199        get_typed_column_writer::<T>(column_writer)
4200    }
4201
4202    /// Returns descriptor for Decimal type with primitive column.
4203    fn get_test_decimals_column_descr<T: DataType>(
4204        max_def_level: i16,
4205        max_rep_level: i16,
4206    ) -> ColumnDescriptor {
4207        let path = ColumnPath::from("col");
4208        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4209            .with_length(16)
4210            .with_logical_type(Some(LogicalType::Decimal {
4211                scale: 2,
4212                precision: 3,
4213            }))
4214            .with_scale(2)
4215            .with_precision(3)
4216            .build()
4217            .unwrap();
4218        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4219    }
4220
4221    fn float16_statistics_roundtrip(
4222        values: &[FixedLenByteArray],
4223    ) -> ValueStatistics<FixedLenByteArray> {
4224        let page_writer = get_test_page_writer();
4225        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4226        writer.write_batch(values, None, None).unwrap();
4227
4228        let metadata = writer.close().unwrap().metadata;
4229        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4230            stats.clone()
4231        } else {
4232            panic!("metadata missing statistics");
4233        }
4234    }
4235
4236    fn get_test_float16_column_writer(
4237        page_writer: Box<dyn PageWriter>,
4238        props: WriterPropertiesPtr,
4239    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4240        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4241        let column_writer = get_column_writer(descr, props, page_writer);
4242        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4243    }
4244
4245    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4246        let path = ColumnPath::from("col");
4247        let tpe =
4248            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4249                .with_length(2)
4250                .with_logical_type(Some(LogicalType::Float16))
4251                .build()
4252                .unwrap();
4253        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4254    }
4255
4256    fn get_test_interval_column_writer(
4257        page_writer: Box<dyn PageWriter>,
4258    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4259        let descr = Arc::new(get_test_interval_column_descr());
4260        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4261        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4262    }
4263
4264    fn get_test_interval_column_descr() -> ColumnDescriptor {
4265        let path = ColumnPath::from("col");
4266        let tpe =
4267            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4268                .with_length(12)
4269                .with_converted_type(ConvertedType::INTERVAL)
4270                .build()
4271                .unwrap();
4272        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4273    }
4274
4275    /// Returns column writer for UINT32 Column provided as ConvertedType only
4276    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4277        page_writer: Box<dyn PageWriter + 'a>,
4278        max_def_level: i16,
4279        max_rep_level: i16,
4280        props: WriterPropertiesPtr,
4281    ) -> ColumnWriterImpl<'a, T> {
4282        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4283            max_def_level,
4284            max_rep_level,
4285        ));
4286        let column_writer = get_column_writer(descr, props, page_writer);
4287        get_typed_column_writer::<T>(column_writer)
4288    }
4289
4290    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4291    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4292        max_def_level: i16,
4293        max_rep_level: i16,
4294    ) -> ColumnDescriptor {
4295        let path = ColumnPath::from("col");
4296        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4297            .with_converted_type(ConvertedType::UINT_32)
4298            .build()
4299            .unwrap();
4300        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4301    }
4302
4303    #[test]
4304    fn test_page_v2_snappy_compression_fallback() {
4305        // Test that PageV2 sets is_compressed to false when Snappy compression increases data size
4306        let page_writer = TestPageWriter {};
4307
4308        // Create WriterProperties with PageV2 and Snappy compression
4309        let props = WriterProperties::builder()
4310            .set_writer_version(WriterVersion::PARQUET_2_0)
4311            // Disable dictionary to ensure data is written directly
4312            .set_dictionary_enabled(false)
4313            .set_compression(Compression::SNAPPY)
4314            .build();
4315
4316        let mut column_writer =
4317            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4318
4319        // Create small, simple data that Snappy compression will likely increase in size
4320        // due to compression overhead for very small data
4321        let values = vec![ByteArray::from("a")];
4322
4323        column_writer.write_batch(&values, None, None).unwrap();
4324
4325        let result = column_writer.close().unwrap();
4326        assert_eq!(
4327            result.metadata.uncompressed_size(),
4328            result.metadata.compressed_size()
4329        );
4330    }
4331}