parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30    BoundaryOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43    OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54    ($e:expr, $i:ident, $b:expr) => {
55        match $e {
56            Self::BoolColumnWriter($i) => $b,
57            Self::Int32ColumnWriter($i) => $b,
58            Self::Int64ColumnWriter($i) => $b,
59            Self::Int96ColumnWriter($i) => $b,
60            Self::FloatColumnWriter($i) => $b,
61            Self::DoubleColumnWriter($i) => $b,
62            Self::ByteArrayColumnWriter($i) => $b,
63            Self::FixedLenByteArrayColumnWriter($i) => $b,
64        }
65    };
66}
67
68/// Column writer for a Parquet type.
69///
70/// See [`get_column_writer`] to create instances of this type
71pub enum ColumnWriter<'a> {
72    /// Column writer for boolean type
73    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74    /// Column writer for int32 type
75    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76    /// Column writer for int64 type
77    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78    /// Column writer for int96 (timestamp) type
79    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80    /// Column writer for float type
81    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82    /// Column writer for double type
83    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84    /// Column writer for byte array type
85    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86    /// Column writer for fixed length byte array type
87    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91    /// Returns the estimated total memory usage
92    #[cfg(feature = "arrow")]
93    pub(crate) fn memory_size(&self) -> usize {
94        downcast_writer!(self, typed, typed.memory_size())
95    }
96
97    /// Returns the estimated total encoded bytes for this column writer
98    #[cfg(feature = "arrow")]
99    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101    }
102
103    /// Close this [`ColumnWriter`], returning the metadata for the column chunk.
104    pub fn close(self) -> Result<ColumnCloseResult> {
105        downcast_writer!(self, typed, typed.close())
106    }
107}
108
109/// Create a specific column writer corresponding to column descriptor `descr`.
110pub fn get_column_writer<'a>(
111    descr: ColumnDescPtr,
112    props: WriterPropertiesPtr,
113    page_writer: Box<dyn PageWriter + 'a>,
114) -> ColumnWriter<'a> {
115    match descr.physical_type() {
116        Type::BOOLEAN => {
117            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
118        }
119        Type::INT32 => {
120            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
121        }
122        Type::INT64 => {
123            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124        }
125        Type::INT96 => {
126            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::FLOAT => {
129            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::DOUBLE => {
132            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::BYTE_ARRAY => {
135            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
138            ColumnWriterImpl::new(descr, props, page_writer),
139        ),
140    }
141}
142
143/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
144/// non-generic type to a generic column writer type `ColumnWriterImpl`.
145///
146/// Panics if actual enum value for `col_writer` does not match the type `T`.
147pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
148    T::get_column_writer(col_writer).unwrap_or_else(|| {
149        panic!(
150            "Failed to convert column writer into a typed column writer for `{}` type",
151            T::get_physical_type()
152        )
153    })
154}
155
156/// Similar to `get_typed_column_writer` but returns a reference.
157pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
158    col_writer: &'b ColumnWriter<'a>,
159) -> &'b ColumnWriterImpl<'a, T> {
160    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
161        panic!(
162            "Failed to convert column writer into a typed column writer for `{}` type",
163            T::get_physical_type()
164        )
165    })
166}
167
168/// Similar to `get_typed_column_writer` but returns a reference.
169pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
170    col_writer: &'a mut ColumnWriter<'b>,
171) -> &'a mut ColumnWriterImpl<'b, T> {
172    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
173        panic!(
174            "Failed to convert column writer into a typed column writer for `{}` type",
175            T::get_physical_type()
176        )
177    })
178}
179
180/// Metadata for a column chunk of a Parquet file.
181///
182/// Note this structure is returned by [`ColumnWriter::close`].
183#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185    /// The total number of bytes written
186    pub bytes_written: u64,
187    /// The total number of rows written
188    pub rows_written: u64,
189    /// Metadata for this column chunk
190    pub metadata: ColumnChunkMetaData,
191    /// Optional bloom filter for this column
192    pub bloom_filter: Option<Sbbf>,
193    /// Optional column index, for filtering
194    pub column_index: Option<ColumnIndexMetaData>,
195    /// Optional offset index, identifying page locations
196    pub offset_index: Option<OffsetIndexMetaData>,
197}
198
199// Metrics per page
200#[derive(Default)]
201struct PageMetrics {
202    num_buffered_values: u32,
203    num_buffered_rows: u32,
204    num_page_nulls: u64,
205    repetition_level_histogram: Option<LevelHistogram>,
206    definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210    fn new() -> Self {
211        Default::default()
212    }
213
214    /// Initialize the repetition level histogram
215    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Initialize the definition level histogram
221    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222        self.definition_level_histogram = LevelHistogram::try_new(max_level);
223        self
224    }
225
226    /// Resets the state of this `PageMetrics` to the initial state.
227    /// If histograms have been initialized their contents will be reset to zero.
228    fn new_page(&mut self) {
229        self.num_buffered_values = 0;
230        self.num_buffered_rows = 0;
231        self.num_page_nulls = 0;
232        self.repetition_level_histogram
233            .as_mut()
234            .map(LevelHistogram::reset);
235        self.definition_level_histogram
236            .as_mut()
237            .map(LevelHistogram::reset);
238    }
239
240    /// Updates histogram values using provided repetition levels
241    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243            rep_hist.update_from_levels(levels);
244        }
245    }
246
247    /// Updates histogram values using provided definition levels
248    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249        if let Some(ref mut def_hist) = self.definition_level_histogram {
250            def_hist.update_from_levels(levels);
251        }
252    }
253}
254
255// Metrics per column writer
256#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258    total_bytes_written: u64,
259    total_rows_written: u64,
260    total_uncompressed_size: u64,
261    total_compressed_size: u64,
262    total_num_values: u64,
263    dictionary_page_offset: Option<u64>,
264    data_page_offset: Option<u64>,
265    min_column_value: Option<T>,
266    max_column_value: Option<T>,
267    num_column_nulls: u64,
268    column_distinct_count: Option<u64>,
269    variable_length_bytes: Option<i64>,
270    repetition_level_histogram: Option<LevelHistogram>,
271    definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275    fn new() -> Self {
276        Default::default()
277    }
278
279    /// Initialize the repetition level histogram
280    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Initialize the definition level histogram
286    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287        self.definition_level_histogram = LevelHistogram::try_new(max_level);
288        self
289    }
290
291    /// Sum `page_histogram` into `chunk_histogram`
292    fn update_histogram(
293        chunk_histogram: &mut Option<LevelHistogram>,
294        page_histogram: &Option<LevelHistogram>,
295    ) {
296        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297            chunk_hist.add(page_hist);
298        }
299    }
300
301    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
302    /// page histograms are not initialized.
303    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304        ColumnMetrics::<T>::update_histogram(
305            &mut self.definition_level_histogram,
306            &page_metrics.definition_level_histogram,
307        );
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.repetition_level_histogram,
310            &page_metrics.repetition_level_histogram,
311        );
312    }
313
314    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
315    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316        if let Some(var_bytes) = variable_length_bytes {
317            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318        }
319    }
320}
321
322/// Typed column writer for a primitive column.
323pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325/// Generic column writer for a primitive Parquet column
326pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327    // Column writer properties
328    descr: ColumnDescPtr,
329    props: WriterPropertiesPtr,
330    statistics_enabled: EnabledStatistics,
331
332    page_writer: Box<dyn PageWriter + 'a>,
333    codec: Compression,
334    compressor: Option<Box<dyn Codec>>,
335    encoder: E,
336
337    page_metrics: PageMetrics,
338    // Metrics per column writer
339    column_metrics: ColumnMetrics<E::T>,
340
341    /// The order of encodings within the generated metadata does not impact its meaning,
342    /// but we use a BTreeSet so that the output is deterministic
343    encodings: BTreeSet<Encoding>,
344    encoding_stats: Vec<PageEncodingStats>,
345    // Reused buffers
346    def_levels_sink: Vec<i16>,
347    rep_levels_sink: Vec<i16>,
348    data_pages: VecDeque<CompressedPage>,
349    // column index and offset index
350    column_index_builder: ColumnIndexBuilder,
351    offset_index_builder: Option<OffsetIndexBuilder>,
352
353    // Below fields used to incrementally check boundary order across data pages.
354    // We assume they are ascending/descending until proven wrong.
355    data_page_boundary_ascending: bool,
356    data_page_boundary_descending: bool,
357    /// (min, max)
358    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
359}
360
361impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
362    /// Returns a new instance of [`GenericColumnWriter`].
363    pub fn new(
364        descr: ColumnDescPtr,
365        props: WriterPropertiesPtr,
366        page_writer: Box<dyn PageWriter + 'a>,
367    ) -> Self {
368        let codec = props.compression(descr.path());
369        let codec_options = CodecOptionsBuilder::default().build();
370        let compressor = create_codec(codec, &codec_options).unwrap();
371        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
372
373        let statistics_enabled = props.statistics_enabled(descr.path());
374
375        let mut encodings = BTreeSet::new();
376        // Used for level information
377        encodings.insert(Encoding::RLE);
378
379        let mut page_metrics = PageMetrics::new();
380        let mut column_metrics = ColumnMetrics::<E::T>::new();
381
382        // Initialize level histograms if collecting page or chunk statistics
383        if statistics_enabled != EnabledStatistics::None {
384            page_metrics = page_metrics
385                .with_repetition_level_histogram(descr.max_rep_level())
386                .with_definition_level_histogram(descr.max_def_level());
387            column_metrics = column_metrics
388                .with_repetition_level_histogram(descr.max_rep_level())
389                .with_definition_level_histogram(descr.max_def_level())
390        }
391
392        // Disable column_index_builder if not collecting page statistics.
393        let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
394        if statistics_enabled != EnabledStatistics::Page {
395            column_index_builder.to_invalid()
396        }
397
398        // Disable offset_index_builder if requested by user.
399        let offset_index_builder = match props.offset_index_disabled() {
400            false => Some(OffsetIndexBuilder::new()),
401            _ => None,
402        };
403
404        Self {
405            descr,
406            props,
407            statistics_enabled,
408            page_writer,
409            codec,
410            compressor,
411            encoder,
412            def_levels_sink: vec![],
413            rep_levels_sink: vec![],
414            data_pages: VecDeque::new(),
415            page_metrics,
416            column_metrics,
417            column_index_builder,
418            offset_index_builder,
419            encodings,
420            encoding_stats: vec![],
421            data_page_boundary_ascending: true,
422            data_page_boundary_descending: true,
423            last_non_null_data_page_min_max: None,
424        }
425    }
426
427    #[allow(clippy::too_many_arguments)]
428    pub(crate) fn write_batch_internal(
429        &mut self,
430        values: &E::Values,
431        value_indices: Option<&[usize]>,
432        def_levels: Option<&[i16]>,
433        rep_levels: Option<&[i16]>,
434        min: Option<&E::T>,
435        max: Option<&E::T>,
436        distinct_count: Option<u64>,
437    ) -> Result<usize> {
438        // Check if number of definition levels is the same as number of repetition levels.
439        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
440            if def.len() != rep.len() {
441                return Err(general_err!(
442                    "Inconsistent length of definition and repetition levels: {} != {}",
443                    def.len(),
444                    rep.len()
445                ));
446            }
447        }
448
449        // We check for DataPage limits only after we have inserted the values. If a user
450        // writes a large number of values, the DataPage size can be well above the limit.
451        //
452        // The purpose of this chunking is to bound this. Even if a user writes large
453        // number of values, the chunking will ensure that we add data page at a
454        // reasonable pagesize limit.
455
456        // TODO: find out why we don't account for size of levels when we estimate page
457        // size.
458
459        let num_levels = match def_levels {
460            Some(def_levels) => def_levels.len(),
461            None => values.len(),
462        };
463
464        if let Some(min) = min {
465            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
466        }
467        if let Some(max) = max {
468            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
469        }
470
471        // We can only set the distinct count if there are no other writes
472        if self.encoder.num_values() == 0 {
473            self.column_metrics.column_distinct_count = distinct_count;
474        } else {
475            self.column_metrics.column_distinct_count = None;
476        }
477
478        let mut values_offset = 0;
479        let mut levels_offset = 0;
480        let base_batch_size = self.props.write_batch_size();
481        while levels_offset < num_levels {
482            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
483
484            // Split at record boundary
485            if let Some(r) = rep_levels {
486                while end_offset < r.len() && r[end_offset] != 0 {
487                    end_offset += 1;
488                }
489            }
490
491            values_offset += self.write_mini_batch(
492                values,
493                values_offset,
494                value_indices,
495                end_offset - levels_offset,
496                def_levels.map(|lv| &lv[levels_offset..end_offset]),
497                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
498            )?;
499            levels_offset = end_offset;
500        }
501
502        // Return total number of values processed.
503        Ok(values_offset)
504    }
505
506    /// Writes batch of values, definition levels and repetition levels.
507    /// Returns number of values processed (written).
508    ///
509    /// If definition and repetition levels are provided, we write fully those levels and
510    /// select how many values to write (this number will be returned), since number of
511    /// actual written values may be smaller than provided values.
512    ///
513    /// If only values are provided, then all values are written and the length of
514    /// of the values buffer is returned.
515    ///
516    /// Definition and/or repetition levels can be omitted, if values are
517    /// non-nullable and/or non-repeated.
518    pub fn write_batch(
519        &mut self,
520        values: &E::Values,
521        def_levels: Option<&[i16]>,
522        rep_levels: Option<&[i16]>,
523    ) -> Result<usize> {
524        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
525    }
526
527    /// Writer may optionally provide pre-calculated statistics for use when computing
528    /// chunk-level statistics
529    ///
530    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
531    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
532    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
533    /// computed page statistics
534    pub fn write_batch_with_statistics(
535        &mut self,
536        values: &E::Values,
537        def_levels: Option<&[i16]>,
538        rep_levels: Option<&[i16]>,
539        min: Option<&E::T>,
540        max: Option<&E::T>,
541        distinct_count: Option<u64>,
542    ) -> Result<usize> {
543        self.write_batch_internal(
544            values,
545            None,
546            def_levels,
547            rep_levels,
548            min,
549            max,
550            distinct_count,
551        )
552    }
553
554    /// Returns the estimated total memory usage.
555    ///
556    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
557    /// of the current memory usage and not the final anticipated encoded size.
558    #[cfg(feature = "arrow")]
559    pub(crate) fn memory_size(&self) -> usize {
560        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
561    }
562
563    /// Returns total number of bytes written by this column writer so far.
564    /// This value is also returned when column writer is closed.
565    ///
566    /// Note: this value does not include any buffered data that has not
567    /// yet been flushed to a page.
568    pub fn get_total_bytes_written(&self) -> u64 {
569        self.column_metrics.total_bytes_written
570    }
571
572    /// Returns the estimated total encoded bytes for this column writer.
573    ///
574    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
575    /// of any data that has not yet been flushed to a page, based on it's
576    /// anticipated encoded size.
577    #[cfg(feature = "arrow")]
578    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
579        self.data_pages
580            .iter()
581            .map(|page| page.data().len() as u64)
582            .sum::<u64>()
583            + self.column_metrics.total_bytes_written
584            + self.encoder.estimated_data_page_size() as u64
585            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
586    }
587
588    /// Returns total number of rows written by this column writer so far.
589    /// This value is also returned when column writer is closed.
590    pub fn get_total_rows_written(&self) -> u64 {
591        self.column_metrics.total_rows_written
592    }
593
594    /// Returns a reference to a [`ColumnDescPtr`]
595    pub fn get_descriptor(&self) -> &ColumnDescPtr {
596        &self.descr
597    }
598
599    /// Finalizes writes and closes the column writer.
600    /// Returns total bytes written, total rows written and column chunk metadata.
601    pub fn close(mut self) -> Result<ColumnCloseResult> {
602        if self.page_metrics.num_buffered_values > 0 {
603            self.add_data_page()?;
604        }
605        if self.encoder.has_dictionary() {
606            self.write_dictionary_page()?;
607        }
608        self.flush_data_pages()?;
609        let metadata = self.build_column_metadata()?;
610        self.page_writer.close()?;
611
612        let boundary_order = match (
613            self.data_page_boundary_ascending,
614            self.data_page_boundary_descending,
615        ) {
616            // If the lists are composed of equal elements then will be marked as ascending
617            // (Also the case if all pages are null pages)
618            (true, _) => BoundaryOrder::ASCENDING,
619            (false, true) => BoundaryOrder::DESCENDING,
620            (false, false) => BoundaryOrder::UNORDERED,
621        };
622        self.column_index_builder.set_boundary_order(boundary_order);
623
624        let column_index = match self.column_index_builder.valid() {
625            true => Some(self.column_index_builder.build()?),
626            false => None,
627        };
628
629        let offset_index = self.offset_index_builder.map(|b| b.build());
630
631        Ok(ColumnCloseResult {
632            bytes_written: self.column_metrics.total_bytes_written,
633            rows_written: self.column_metrics.total_rows_written,
634            bloom_filter: self.encoder.flush_bloom_filter(),
635            metadata,
636            column_index,
637            offset_index,
638        })
639    }
640
641    /// Writes mini batch of values, definition and repetition levels.
642    /// This allows fine-grained processing of values and maintaining a reasonable
643    /// page size.
644    fn write_mini_batch(
645        &mut self,
646        values: &E::Values,
647        values_offset: usize,
648        value_indices: Option<&[usize]>,
649        num_levels: usize,
650        def_levels: Option<&[i16]>,
651        rep_levels: Option<&[i16]>,
652    ) -> Result<usize> {
653        // Process definition levels and determine how many values to write.
654        let values_to_write = if self.descr.max_def_level() > 0 {
655            let levels = def_levels.ok_or_else(|| {
656                general_err!(
657                    "Definition levels are required, because max definition level = {}",
658                    self.descr.max_def_level()
659                )
660            })?;
661
662            let values_to_write = levels
663                .iter()
664                .map(|level| (*level == self.descr.max_def_level()) as usize)
665                .sum();
666            self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
667
668            // Update histogram
669            self.page_metrics.update_definition_level_histogram(levels);
670
671            self.def_levels_sink.extend_from_slice(levels);
672            values_to_write
673        } else {
674            num_levels
675        };
676
677        // Process repetition levels and determine how many rows we are about to process.
678        if self.descr.max_rep_level() > 0 {
679            // A row could contain more than one value.
680            let levels = rep_levels.ok_or_else(|| {
681                general_err!(
682                    "Repetition levels are required, because max repetition level = {}",
683                    self.descr.max_rep_level()
684                )
685            })?;
686
687            if !levels.is_empty() && levels[0] != 0 {
688                return Err(general_err!(
689                    "Write must start at a record boundary, got non-zero repetition level of {}",
690                    levels[0]
691                ));
692            }
693
694            // Count the occasions where we start a new row
695            for &level in levels {
696                self.page_metrics.num_buffered_rows += (level == 0) as u32
697            }
698
699            // Update histogram
700            self.page_metrics.update_repetition_level_histogram(levels);
701
702            self.rep_levels_sink.extend_from_slice(levels);
703        } else {
704            // Each value is exactly one row.
705            // Equals to the number of values, we count nulls as well.
706            self.page_metrics.num_buffered_rows += num_levels as u32;
707        }
708
709        match value_indices {
710            Some(indices) => {
711                let indices = &indices[values_offset..values_offset + values_to_write];
712                self.encoder.write_gather(values, indices)?;
713            }
714            None => self.encoder.write(values, values_offset, values_to_write)?,
715        }
716
717        self.page_metrics.num_buffered_values += num_levels as u32;
718
719        if self.should_add_data_page() {
720            self.add_data_page()?;
721        }
722
723        if self.should_dict_fallback() {
724            self.dict_fallback()?;
725        }
726
727        Ok(values_to_write)
728    }
729
730    /// Returns true if we need to fall back to non-dictionary encoding.
731    ///
732    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
733    /// size.
734    #[inline]
735    fn should_dict_fallback(&self) -> bool {
736        match self.encoder.estimated_dict_page_size() {
737            Some(size) => {
738                size >= self
739                    .props
740                    .column_dictionary_page_size_limit(self.descr.path())
741            }
742            None => false,
743        }
744    }
745
746    /// Returns true if there is enough data for a data page, false otherwise.
747    #[inline]
748    fn should_add_data_page(&self) -> bool {
749        // This is necessary in the event of a much larger dictionary size than page size
750        //
751        // In such a scenario the dictionary decoder may return an estimated encoded
752        // size in excess of the page size limit, even when there are no buffered values
753        if self.page_metrics.num_buffered_values == 0 {
754            return false;
755        }
756
757        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
758            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
759    }
760
761    /// Performs dictionary fallback.
762    /// Prepares and writes dictionary and all data pages into page writer.
763    fn dict_fallback(&mut self) -> Result<()> {
764        // At this point we know that we need to fall back.
765        if self.page_metrics.num_buffered_values > 0 {
766            self.add_data_page()?;
767        }
768        self.write_dictionary_page()?;
769        self.flush_data_pages()?;
770        Ok(())
771    }
772
773    /// Update the column index and offset index when adding the data page
774    fn update_column_offset_index(
775        &mut self,
776        page_statistics: Option<&ValueStatistics<E::T>>,
777        page_variable_length_bytes: Option<i64>,
778    ) {
779        // update the column index
780        let null_page =
781            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
782        // a page contains only null values,
783        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
784        if null_page && self.column_index_builder.valid() {
785            self.column_index_builder.append(
786                null_page,
787                vec![],
788                vec![],
789                self.page_metrics.num_page_nulls as i64,
790            );
791        } else if self.column_index_builder.valid() {
792            // from page statistics
793            // If can't get the page statistics, ignore this column/offset index for this column chunk
794            match &page_statistics {
795                None => {
796                    self.column_index_builder.to_invalid();
797                }
798                Some(stat) => {
799                    // Check if min/max are still ascending/descending across pages
800                    let new_min = stat.min_opt().unwrap();
801                    let new_max = stat.max_opt().unwrap();
802                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
803                        if self.data_page_boundary_ascending {
804                            // If last min/max are greater than new min/max then not ascending anymore
805                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
806                                || compare_greater(&self.descr, last_max, new_max);
807                            if not_ascending {
808                                self.data_page_boundary_ascending = false;
809                            }
810                        }
811
812                        if self.data_page_boundary_descending {
813                            // If new min/max are greater than last min/max then not descending anymore
814                            let not_descending = compare_greater(&self.descr, new_min, last_min)
815                                || compare_greater(&self.descr, new_max, last_max);
816                            if not_descending {
817                                self.data_page_boundary_descending = false;
818                            }
819                        }
820                    }
821                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
822
823                    if self.can_truncate_value() {
824                        self.column_index_builder.append(
825                            null_page,
826                            self.truncate_min_value(
827                                self.props.column_index_truncate_length(),
828                                stat.min_bytes_opt().unwrap(),
829                            )
830                            .0,
831                            self.truncate_max_value(
832                                self.props.column_index_truncate_length(),
833                                stat.max_bytes_opt().unwrap(),
834                            )
835                            .0,
836                            self.page_metrics.num_page_nulls as i64,
837                        );
838                    } else {
839                        self.column_index_builder.append(
840                            null_page,
841                            stat.min_bytes_opt().unwrap().to_vec(),
842                            stat.max_bytes_opt().unwrap().to_vec(),
843                            self.page_metrics.num_page_nulls as i64,
844                        );
845                    }
846                }
847            }
848        }
849
850        // Append page histograms to the `ColumnIndex` histograms
851        self.column_index_builder.append_histograms(
852            &self.page_metrics.repetition_level_histogram,
853            &self.page_metrics.definition_level_histogram,
854        );
855
856        // Update the offset index
857        if let Some(builder) = self.offset_index_builder.as_mut() {
858            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
859            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
860        }
861    }
862
863    /// Determine if we should allow truncating min/max values for this column's statistics
864    fn can_truncate_value(&self) -> bool {
865        match self.descr.physical_type() {
866            // Don't truncate for Float16 and Decimal because their sort order is different
867            // from that of FIXED_LEN_BYTE_ARRAY sort order.
868            // So truncation of those types could lead to inaccurate min/max statistics
869            Type::FIXED_LEN_BYTE_ARRAY
870                if !matches!(
871                    self.descr.logical_type(),
872                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
873                ) =>
874            {
875                true
876            }
877            Type::BYTE_ARRAY => true,
878            // Truncation only applies for fba/binary physical types
879            _ => false,
880        }
881    }
882
883    /// Returns `true` if this column's logical type is a UTF-8 string.
884    fn is_utf8(&self) -> bool {
885        self.get_descriptor().logical_type() == Some(LogicalType::String)
886            || self.get_descriptor().converted_type() == ConvertedType::UTF8
887    }
888
889    /// Truncates a binary statistic to at most `truncation_length` bytes.
890    ///
891    /// If truncation is not possible, returns `data`.
892    ///
893    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
894    ///
895    /// UTF-8 Note:
896    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
897    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
898    /// on non-character boundaries.
899    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
900        truncation_length
901            .filter(|l| data.len() > *l)
902            .and_then(|l|
903                // don't do extra work if this column isn't UTF-8
904                if self.is_utf8() {
905                    match str::from_utf8(data) {
906                        Ok(str_data) => truncate_utf8(str_data, l),
907                        Err(_) => Some(data[..l].to_vec()),
908                    }
909                } else {
910                    Some(data[..l].to_vec())
911                }
912            )
913            .map(|truncated| (truncated, true))
914            .unwrap_or_else(|| (data.to_vec(), false))
915    }
916
917    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
918    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
919    /// `truncation_length` bytes if the last byte(s) overflows.
920    ///
921    /// If truncation is not possible, returns `data`.
922    ///
923    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
924    ///
925    /// UTF-8 Note:
926    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
927    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
928    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
929    /// binary.
930    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
931        truncation_length
932            .filter(|l| data.len() > *l)
933            .and_then(|l|
934                // don't do extra work if this column isn't UTF-8
935                if self.is_utf8() {
936                    match str::from_utf8(data) {
937                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
938                        Err(_) => increment(data[..l].to_vec()),
939                    }
940                } else {
941                    increment(data[..l].to_vec())
942                }
943            )
944            .map(|truncated| (truncated, true))
945            .unwrap_or_else(|| (data.to_vec(), false))
946    }
947
948    /// Truncate the min and max values that will be written to a data page
949    /// header or column chunk Statistics
950    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
951        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
952        match statistics {
953            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
954                let (min, did_truncate_min) = self.truncate_min_value(
955                    self.props.statistics_truncate_length(),
956                    stats.min_bytes_opt().unwrap(),
957                );
958                let (max, did_truncate_max) = self.truncate_max_value(
959                    self.props.statistics_truncate_length(),
960                    stats.max_bytes_opt().unwrap(),
961                );
962                Statistics::ByteArray(
963                    ValueStatistics::new(
964                        Some(min.into()),
965                        Some(max.into()),
966                        stats.distinct_count(),
967                        stats.null_count_opt(),
968                        backwards_compatible_min_max,
969                    )
970                    .with_max_is_exact(!did_truncate_max)
971                    .with_min_is_exact(!did_truncate_min),
972                )
973            }
974            Statistics::FixedLenByteArray(stats)
975                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
976            {
977                let (min, did_truncate_min) = self.truncate_min_value(
978                    self.props.statistics_truncate_length(),
979                    stats.min_bytes_opt().unwrap(),
980                );
981                let (max, did_truncate_max) = self.truncate_max_value(
982                    self.props.statistics_truncate_length(),
983                    stats.max_bytes_opt().unwrap(),
984                );
985                Statistics::FixedLenByteArray(
986                    ValueStatistics::new(
987                        Some(min.into()),
988                        Some(max.into()),
989                        stats.distinct_count(),
990                        stats.null_count_opt(),
991                        backwards_compatible_min_max,
992                    )
993                    .with_max_is_exact(!did_truncate_max)
994                    .with_min_is_exact(!did_truncate_min),
995                )
996            }
997            stats => stats,
998        }
999    }
1000
1001    /// Adds data page.
1002    /// Data page is either buffered in case of dictionary encoding or written directly.
1003    fn add_data_page(&mut self) -> Result<()> {
1004        // Extract encoded values
1005        let values_data = self.encoder.flush_data_page()?;
1006
1007        let max_def_level = self.descr.max_def_level();
1008        let max_rep_level = self.descr.max_rep_level();
1009
1010        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1011
1012        let page_statistics = match (values_data.min_value, values_data.max_value) {
1013            (Some(min), Some(max)) => {
1014                // Update chunk level statistics
1015                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1016                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1017
1018                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1019                    ValueStatistics::new(
1020                        Some(min),
1021                        Some(max),
1022                        None,
1023                        Some(self.page_metrics.num_page_nulls),
1024                        false,
1025                    ),
1026                )
1027            }
1028            _ => None,
1029        };
1030
1031        // update column and offset index
1032        self.update_column_offset_index(
1033            page_statistics.as_ref(),
1034            values_data.variable_length_bytes,
1035        );
1036
1037        // Update histograms and variable_length_bytes in column_metrics
1038        self.column_metrics
1039            .update_from_page_metrics(&self.page_metrics);
1040        self.column_metrics
1041            .update_variable_length_bytes(values_data.variable_length_bytes);
1042
1043        // From here on, we only need page statistics if they will be written to the page header.
1044        let page_statistics = page_statistics
1045            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1046            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1047
1048        let compressed_page = match self.props.writer_version() {
1049            WriterVersion::PARQUET_1_0 => {
1050                let mut buffer = vec![];
1051
1052                if max_rep_level > 0 {
1053                    buffer.extend_from_slice(
1054                        &self.encode_levels_v1(
1055                            Encoding::RLE,
1056                            &self.rep_levels_sink[..],
1057                            max_rep_level,
1058                        )[..],
1059                    );
1060                }
1061
1062                if max_def_level > 0 {
1063                    buffer.extend_from_slice(
1064                        &self.encode_levels_v1(
1065                            Encoding::RLE,
1066                            &self.def_levels_sink[..],
1067                            max_def_level,
1068                        )[..],
1069                    );
1070                }
1071
1072                buffer.extend_from_slice(&values_data.buf);
1073                let uncompressed_size = buffer.len();
1074
1075                if let Some(ref mut cmpr) = self.compressor {
1076                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1077                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1078                    compressed_buf.shrink_to_fit();
1079                    buffer = compressed_buf;
1080                }
1081
1082                let data_page = Page::DataPage {
1083                    buf: buffer.into(),
1084                    num_values: self.page_metrics.num_buffered_values,
1085                    encoding: values_data.encoding,
1086                    def_level_encoding: Encoding::RLE,
1087                    rep_level_encoding: Encoding::RLE,
1088                    statistics: page_statistics,
1089                };
1090
1091                CompressedPage::new(data_page, uncompressed_size)
1092            }
1093            WriterVersion::PARQUET_2_0 => {
1094                let mut rep_levels_byte_len = 0;
1095                let mut def_levels_byte_len = 0;
1096                let mut buffer = vec![];
1097
1098                if max_rep_level > 0 {
1099                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1100                    rep_levels_byte_len = levels.len();
1101                    buffer.extend_from_slice(&levels[..]);
1102                }
1103
1104                if max_def_level > 0 {
1105                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1106                    def_levels_byte_len = levels.len();
1107                    buffer.extend_from_slice(&levels[..]);
1108                }
1109
1110                let uncompressed_size =
1111                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1112
1113                // Data Page v2 compresses values only.
1114                let is_compressed = match self.compressor {
1115                    Some(ref mut cmpr) => {
1116                        let buffer_len = buffer.len();
1117                        cmpr.compress(&values_data.buf, &mut buffer)?;
1118                        if uncompressed_size <= buffer.len() - buffer_len {
1119                            buffer.truncate(buffer_len);
1120                            buffer.extend_from_slice(&values_data.buf);
1121                            false
1122                        } else {
1123                            true
1124                        }
1125                    }
1126                    None => {
1127                        buffer.extend_from_slice(&values_data.buf);
1128                        false
1129                    }
1130                };
1131
1132                let data_page = Page::DataPageV2 {
1133                    buf: buffer.into(),
1134                    num_values: self.page_metrics.num_buffered_values,
1135                    encoding: values_data.encoding,
1136                    num_nulls: self.page_metrics.num_page_nulls as u32,
1137                    num_rows: self.page_metrics.num_buffered_rows,
1138                    def_levels_byte_len: def_levels_byte_len as u32,
1139                    rep_levels_byte_len: rep_levels_byte_len as u32,
1140                    is_compressed,
1141                    statistics: page_statistics,
1142                };
1143
1144                CompressedPage::new(data_page, uncompressed_size)
1145            }
1146        };
1147
1148        // Check if we need to buffer data page or flush it to the sink directly.
1149        if self.encoder.has_dictionary() {
1150            self.data_pages.push_back(compressed_page);
1151        } else {
1152            self.write_data_page(compressed_page)?;
1153        }
1154
1155        // Update total number of rows.
1156        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1157
1158        // Reset state.
1159        self.rep_levels_sink.clear();
1160        self.def_levels_sink.clear();
1161        self.page_metrics.new_page();
1162
1163        Ok(())
1164    }
1165
1166    /// Finalises any outstanding data pages and flushes buffered data pages from
1167    /// dictionary encoding into underlying sink.
1168    #[inline]
1169    fn flush_data_pages(&mut self) -> Result<()> {
1170        // Write all outstanding data to a new page.
1171        if self.page_metrics.num_buffered_values > 0 {
1172            self.add_data_page()?;
1173        }
1174
1175        while let Some(page) = self.data_pages.pop_front() {
1176            self.write_data_page(page)?;
1177        }
1178
1179        Ok(())
1180    }
1181
1182    /// Assembles column chunk metadata.
1183    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1184        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1185        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1186        let num_values = self.column_metrics.total_num_values as i64;
1187        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1188        // If data page offset is not set, then no pages have been written
1189        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1190
1191        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1192            .set_compression(self.codec)
1193            .set_encodings(self.encodings.iter().cloned().collect())
1194            .set_page_encoding_stats(self.encoding_stats.clone())
1195            .set_total_compressed_size(total_compressed_size)
1196            .set_total_uncompressed_size(total_uncompressed_size)
1197            .set_num_values(num_values)
1198            .set_data_page_offset(data_page_offset)
1199            .set_dictionary_page_offset(dict_page_offset);
1200
1201        if self.statistics_enabled != EnabledStatistics::None {
1202            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1203
1204            let statistics = ValueStatistics::<E::T>::new(
1205                self.column_metrics.min_column_value.clone(),
1206                self.column_metrics.max_column_value.clone(),
1207                self.column_metrics.column_distinct_count,
1208                Some(self.column_metrics.num_column_nulls),
1209                false,
1210            )
1211            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1212            .into();
1213
1214            let statistics = self.truncate_statistics(statistics);
1215
1216            builder = builder
1217                .set_statistics(statistics)
1218                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1219                .set_repetition_level_histogram(
1220                    self.column_metrics.repetition_level_histogram.take(),
1221                )
1222                .set_definition_level_histogram(
1223                    self.column_metrics.definition_level_histogram.take(),
1224                );
1225
1226            if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1227                builder = builder.set_geo_statistics(geo_stats);
1228            }
1229        }
1230
1231        builder = self.set_column_chunk_encryption_properties(builder);
1232
1233        let metadata = builder.build()?;
1234        Ok(metadata)
1235    }
1236
1237    /// Encodes definition or repetition levels for Data Page v1.
1238    #[inline]
1239    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1240        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1241        encoder.put(levels);
1242        encoder.consume()
1243    }
1244
1245    /// Encodes definition or repetition levels for Data Page v2.
1246    /// Encoding is always RLE.
1247    #[inline]
1248    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1249        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1250        encoder.put(levels);
1251        encoder.consume()
1252    }
1253
1254    /// Writes compressed data page into underlying sink and updates global metrics.
1255    #[inline]
1256    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1257        self.encodings.insert(page.encoding());
1258        match self.encoding_stats.last_mut() {
1259            Some(encoding_stats)
1260                if encoding_stats.page_type == page.page_type()
1261                    && encoding_stats.encoding == page.encoding() =>
1262            {
1263                encoding_stats.count += 1;
1264            }
1265            _ => {
1266                // data page type does not change inside a file
1267                // encoding can currently only change from dictionary to non-dictionary once
1268                self.encoding_stats.push(PageEncodingStats {
1269                    page_type: page.page_type(),
1270                    encoding: page.encoding(),
1271                    count: 1,
1272                });
1273            }
1274        }
1275        let page_spec = self.page_writer.write_page(page)?;
1276        // update offset index
1277        // compressed_size = header_size + compressed_data_size
1278        if let Some(builder) = self.offset_index_builder.as_mut() {
1279            builder
1280                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1281        }
1282        self.update_metrics_for_page(page_spec);
1283        Ok(())
1284    }
1285
1286    /// Writes dictionary page into underlying sink.
1287    #[inline]
1288    fn write_dictionary_page(&mut self) -> Result<()> {
1289        let compressed_page = {
1290            let mut page = self
1291                .encoder
1292                .flush_dict_page()?
1293                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1294
1295            let uncompressed_size = page.buf.len();
1296
1297            if let Some(ref mut cmpr) = self.compressor {
1298                let mut output_buf = Vec::with_capacity(uncompressed_size);
1299                cmpr.compress(&page.buf, &mut output_buf)?;
1300                page.buf = Bytes::from(output_buf);
1301            }
1302
1303            let dict_page = Page::DictionaryPage {
1304                buf: page.buf,
1305                num_values: page.num_values as u32,
1306                encoding: self.props.dictionary_page_encoding(),
1307                is_sorted: page.is_sorted,
1308            };
1309            CompressedPage::new(dict_page, uncompressed_size)
1310        };
1311
1312        self.encodings.insert(compressed_page.encoding());
1313        self.encoding_stats.push(PageEncodingStats {
1314            page_type: PageType::DICTIONARY_PAGE,
1315            encoding: compressed_page.encoding(),
1316            count: 1,
1317        });
1318        let page_spec = self.page_writer.write_page(compressed_page)?;
1319        self.update_metrics_for_page(page_spec);
1320        // For the directory page, don't need to update column/offset index.
1321        Ok(())
1322    }
1323
1324    /// Updates column writer metrics with each page metadata.
1325    #[inline]
1326    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1327        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1328        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1329        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1330
1331        match page_spec.page_type {
1332            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1333                self.column_metrics.total_num_values += page_spec.num_values as u64;
1334                if self.column_metrics.data_page_offset.is_none() {
1335                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1336                }
1337            }
1338            PageType::DICTIONARY_PAGE => {
1339                assert!(
1340                    self.column_metrics.dictionary_page_offset.is_none(),
1341                    "Dictionary offset is already set"
1342                );
1343                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1344            }
1345            _ => {}
1346        }
1347    }
1348
1349    #[inline]
1350    #[cfg(feature = "encryption")]
1351    fn set_column_chunk_encryption_properties(
1352        &self,
1353        builder: ColumnChunkMetaDataBuilder,
1354    ) -> ColumnChunkMetaDataBuilder {
1355        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1356            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1357                encryption_properties,
1358                &self.descr,
1359            ))
1360        } else {
1361            builder
1362        }
1363    }
1364
1365    #[inline]
1366    #[cfg(not(feature = "encryption"))]
1367    fn set_column_chunk_encryption_properties(
1368        &self,
1369        builder: ColumnChunkMetaDataBuilder,
1370    ) -> ColumnChunkMetaDataBuilder {
1371        builder
1372    }
1373}
1374
1375fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1376    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1377}
1378
1379fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1380    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1381}
1382
1383#[inline]
1384#[allow(clippy::eq_op)]
1385fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1386    match T::PHYSICAL_TYPE {
1387        Type::FLOAT | Type::DOUBLE => val != val,
1388        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1389            let val = val.as_bytes();
1390            let val = f16::from_le_bytes([val[0], val[1]]);
1391            val.is_nan()
1392        }
1393        _ => false,
1394    }
1395}
1396
1397/// Perform a conditional update of `cur`, skipping any NaN values
1398///
1399/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1400/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1401fn update_stat<T: ParquetValueType, F>(
1402    descr: &ColumnDescriptor,
1403    val: &T,
1404    cur: &mut Option<T>,
1405    should_update: F,
1406) where
1407    F: Fn(&T) -> bool,
1408{
1409    if is_nan(descr, val) {
1410        return;
1411    }
1412
1413    if cur.as_ref().is_none_or(should_update) {
1414        *cur = Some(val.clone());
1415    }
1416}
1417
1418/// Evaluate `a > b` according to underlying logical type.
1419fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1420    match T::PHYSICAL_TYPE {
1421        Type::INT32 | Type::INT64 => {
1422            if let Some(LogicalType::Integer {
1423                is_signed: false, ..
1424            }) = descr.logical_type()
1425            {
1426                // need to compare unsigned
1427                return compare_greater_unsigned_int(a, b);
1428            }
1429
1430            match descr.converted_type() {
1431                ConvertedType::UINT_8
1432                | ConvertedType::UINT_16
1433                | ConvertedType::UINT_32
1434                | ConvertedType::UINT_64 => {
1435                    return compare_greater_unsigned_int(a, b);
1436                }
1437                _ => {}
1438            };
1439        }
1440        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1441            if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1442                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1443            }
1444            if let ConvertedType::DECIMAL = descr.converted_type() {
1445                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1446            }
1447            if let Some(LogicalType::Float16) = descr.logical_type() {
1448                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1449            }
1450        }
1451
1452        _ => {}
1453    }
1454
1455    // compare independent of logical / converted type
1456    a > b
1457}
1458
1459// ----------------------------------------------------------------------
1460// Encoding support for column writer.
1461// This mirrors parquet-mr default encodings for writes. See:
1462// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1463// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1464
1465/// Returns encoding for a column when no other encoding is provided in writer properties.
1466fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1467    match (kind, props.writer_version()) {
1468        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1469        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1470        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1471        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1472        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1473        _ => Encoding::PLAIN,
1474    }
1475}
1476
1477/// Returns true if dictionary is supported for column writer, false otherwise.
1478fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1479    match (kind, props.writer_version()) {
1480        // Booleans do not support dict encoding and should use a fallback encoding.
1481        (Type::BOOLEAN, _) => false,
1482        // Dictionary encoding was not enabled in PARQUET 1.0
1483        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1484        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1485        _ => true,
1486    }
1487}
1488
1489#[inline]
1490fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1491    a.as_u64().unwrap() > b.as_u64().unwrap()
1492}
1493
1494#[inline]
1495fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1496    let a = f16::from_le_bytes(a.try_into().unwrap());
1497    let b = f16::from_le_bytes(b.try_into().unwrap());
1498    a > b
1499}
1500
1501/// Signed comparison of bytes arrays
1502fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1503    let a_length = a.len();
1504    let b_length = b.len();
1505
1506    if a_length == 0 || b_length == 0 {
1507        return a_length > 0;
1508    }
1509
1510    let first_a: u8 = a[0];
1511    let first_b: u8 = b[0];
1512
1513    // We can short circuit for different signed numbers or
1514    // for equal length bytes arrays that have different first bytes.
1515    // The equality requirement is necessary for sign extension cases.
1516    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1517    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1518        return (first_a as i8) > (first_b as i8);
1519    }
1520
1521    // When the lengths are unequal and the numbers are of the same
1522    // sign we need to do comparison by sign extending the shorter
1523    // value first, and once we get to equal sized arrays, lexicographical
1524    // unsigned comparison of everything but the first byte is sufficient.
1525
1526    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1527
1528    if a_length != b_length {
1529        let not_equal = if a_length > b_length {
1530            let lead_length = a_length - b_length;
1531            a[0..lead_length].iter().any(|&x| x != extension)
1532        } else {
1533            let lead_length = b_length - a_length;
1534            b[0..lead_length].iter().any(|&x| x != extension)
1535        };
1536
1537        if not_equal {
1538            let negative_values: bool = (first_a as i8) < 0;
1539            let a_longer: bool = a_length > b_length;
1540            return if negative_values { !a_longer } else { a_longer };
1541        }
1542    }
1543
1544    (a[1..]) > (b[1..])
1545}
1546
1547/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1548/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1549/// is not possible within those constraints.
1550///
1551/// The caller guarantees that data.len() > length.
1552fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1553    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1554    Some(data.as_bytes()[..split].to_vec())
1555}
1556
1557/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1558/// longest such slice that is still a valid UTF-8 string while being less than `length`
1559/// bytes and non-empty. Returns `None` if no such transformation is possible.
1560///
1561/// The caller guarantees that data.len() > length.
1562fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1563    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1564    let lower_bound = length.saturating_sub(3);
1565    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1566    increment_utf8(data.get(..split)?)
1567}
1568
1569/// Increment the final character in a UTF-8 string in such a way that the returned result
1570/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1571/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1572/// Returns `None` if the string cannot be incremented.
1573///
1574/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1575fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1576    for (idx, original_char) in data.char_indices().rev() {
1577        let original_len = original_char.len_utf8();
1578        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1579            // do not allow increasing byte width of incremented char
1580            if next_char.len_utf8() == original_len {
1581                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1582                next_char.encode_utf8(&mut result[idx..]);
1583                return Some(result);
1584            }
1585        }
1586    }
1587
1588    None
1589}
1590
1591/// Try and increment the bytes from right to left.
1592///
1593/// Returns `None` if all bytes are set to `u8::MAX`.
1594fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1595    for byte in data.iter_mut().rev() {
1596        let (incremented, overflow) = byte.overflowing_add(1);
1597        *byte = incremented;
1598
1599        if !overflow {
1600            return Some(data);
1601        }
1602    }
1603
1604    None
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609    use crate::{
1610        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1611        schema::parser::parse_message_type,
1612    };
1613    use core::str;
1614    use rand::distr::uniform::SampleUniform;
1615    use std::{fs::File, sync::Arc};
1616
1617    use crate::column::{
1618        page::PageReader,
1619        reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1620    };
1621    use crate::file::writer::TrackedWrite;
1622    use crate::file::{
1623        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1624    };
1625    use crate::schema::types::{ColumnPath, Type as SchemaType};
1626    use crate::util::test_common::rand_gen::random_numbers_range;
1627
1628    use super::*;
1629
1630    #[test]
1631    fn test_column_writer_inconsistent_def_rep_length() {
1632        let page_writer = get_test_page_writer();
1633        let props = Default::default();
1634        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1635        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1636        assert!(res.is_err());
1637        if let Err(err) = res {
1638            assert_eq!(
1639                format!("{err}"),
1640                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1641            );
1642        }
1643    }
1644
1645    #[test]
1646    fn test_column_writer_invalid_def_levels() {
1647        let page_writer = get_test_page_writer();
1648        let props = Default::default();
1649        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1650        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1651        assert!(res.is_err());
1652        if let Err(err) = res {
1653            assert_eq!(
1654                format!("{err}"),
1655                "Parquet error: Definition levels are required, because max definition level = 1"
1656            );
1657        }
1658    }
1659
1660    #[test]
1661    fn test_column_writer_invalid_rep_levels() {
1662        let page_writer = get_test_page_writer();
1663        let props = Default::default();
1664        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1665        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1666        assert!(res.is_err());
1667        if let Err(err) = res {
1668            assert_eq!(
1669                format!("{err}"),
1670                "Parquet error: Repetition levels are required, because max repetition level = 1"
1671            );
1672        }
1673    }
1674
1675    #[test]
1676    fn test_column_writer_not_enough_values_to_write() {
1677        let page_writer = get_test_page_writer();
1678        let props = Default::default();
1679        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1680        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1681        assert!(res.is_err());
1682        if let Err(err) = res {
1683            assert_eq!(
1684                format!("{err}"),
1685                "Parquet error: Expected to write 4 values, but have only 2"
1686            );
1687        }
1688    }
1689
1690    #[test]
1691    fn test_column_writer_write_only_one_dictionary_page() {
1692        let page_writer = get_test_page_writer();
1693        let props = Default::default();
1694        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1695        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1696        // First page should be correctly written.
1697        writer.add_data_page().unwrap();
1698        writer.write_dictionary_page().unwrap();
1699        let err = writer.write_dictionary_page().unwrap_err().to_string();
1700        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1701    }
1702
1703    #[test]
1704    fn test_column_writer_error_when_writing_disabled_dictionary() {
1705        let page_writer = get_test_page_writer();
1706        let props = Arc::new(
1707            WriterProperties::builder()
1708                .set_dictionary_enabled(false)
1709                .build(),
1710        );
1711        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1712        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1713        let err = writer.write_dictionary_page().unwrap_err().to_string();
1714        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1715    }
1716
1717    #[test]
1718    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1719        let page_writer = get_test_page_writer();
1720        let props = Arc::new(
1721            WriterProperties::builder()
1722                .set_dictionary_enabled(true)
1723                .build(),
1724        );
1725        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1726        writer
1727            .write_batch(&[true, false, true, false], None, None)
1728            .unwrap();
1729
1730        let r = writer.close().unwrap();
1731        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1732        // byte.
1733        assert_eq!(r.bytes_written, 1);
1734        assert_eq!(r.rows_written, 4);
1735
1736        let metadata = r.metadata;
1737        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1738        assert_eq!(metadata.num_values(), 4); // just values
1739        assert_eq!(metadata.dictionary_page_offset(), None);
1740    }
1741
1742    #[test]
1743    fn test_column_writer_default_encoding_support_bool() {
1744        check_encoding_write_support::<BoolType>(
1745            WriterVersion::PARQUET_1_0,
1746            true,
1747            &[true, false],
1748            None,
1749            &[Encoding::PLAIN, Encoding::RLE],
1750            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1751        );
1752        check_encoding_write_support::<BoolType>(
1753            WriterVersion::PARQUET_1_0,
1754            false,
1755            &[true, false],
1756            None,
1757            &[Encoding::PLAIN, Encoding::RLE],
1758            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1759        );
1760        check_encoding_write_support::<BoolType>(
1761            WriterVersion::PARQUET_2_0,
1762            true,
1763            &[true, false],
1764            None,
1765            &[Encoding::RLE],
1766            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1767        );
1768        check_encoding_write_support::<BoolType>(
1769            WriterVersion::PARQUET_2_0,
1770            false,
1771            &[true, false],
1772            None,
1773            &[Encoding::RLE],
1774            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1775        );
1776    }
1777
1778    #[test]
1779    fn test_column_writer_default_encoding_support_int32() {
1780        check_encoding_write_support::<Int32Type>(
1781            WriterVersion::PARQUET_1_0,
1782            true,
1783            &[1, 2],
1784            Some(0),
1785            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1786            &[
1787                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1788                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1789            ],
1790        );
1791        check_encoding_write_support::<Int32Type>(
1792            WriterVersion::PARQUET_1_0,
1793            false,
1794            &[1, 2],
1795            None,
1796            &[Encoding::PLAIN, Encoding::RLE],
1797            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1798        );
1799        check_encoding_write_support::<Int32Type>(
1800            WriterVersion::PARQUET_2_0,
1801            true,
1802            &[1, 2],
1803            Some(0),
1804            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1805            &[
1806                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1807                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1808            ],
1809        );
1810        check_encoding_write_support::<Int32Type>(
1811            WriterVersion::PARQUET_2_0,
1812            false,
1813            &[1, 2],
1814            None,
1815            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1816            &[encoding_stats(
1817                PageType::DATA_PAGE_V2,
1818                Encoding::DELTA_BINARY_PACKED,
1819                1,
1820            )],
1821        );
1822    }
1823
1824    #[test]
1825    fn test_column_writer_default_encoding_support_int64() {
1826        check_encoding_write_support::<Int64Type>(
1827            WriterVersion::PARQUET_1_0,
1828            true,
1829            &[1, 2],
1830            Some(0),
1831            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1832            &[
1833                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1834                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1835            ],
1836        );
1837        check_encoding_write_support::<Int64Type>(
1838            WriterVersion::PARQUET_1_0,
1839            false,
1840            &[1, 2],
1841            None,
1842            &[Encoding::PLAIN, Encoding::RLE],
1843            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1844        );
1845        check_encoding_write_support::<Int64Type>(
1846            WriterVersion::PARQUET_2_0,
1847            true,
1848            &[1, 2],
1849            Some(0),
1850            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1851            &[
1852                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1853                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1854            ],
1855        );
1856        check_encoding_write_support::<Int64Type>(
1857            WriterVersion::PARQUET_2_0,
1858            false,
1859            &[1, 2],
1860            None,
1861            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1862            &[encoding_stats(
1863                PageType::DATA_PAGE_V2,
1864                Encoding::DELTA_BINARY_PACKED,
1865                1,
1866            )],
1867        );
1868    }
1869
1870    #[test]
1871    fn test_column_writer_default_encoding_support_int96() {
1872        check_encoding_write_support::<Int96Type>(
1873            WriterVersion::PARQUET_1_0,
1874            true,
1875            &[Int96::from(vec![1, 2, 3])],
1876            Some(0),
1877            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1878            &[
1879                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1880                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1881            ],
1882        );
1883        check_encoding_write_support::<Int96Type>(
1884            WriterVersion::PARQUET_1_0,
1885            false,
1886            &[Int96::from(vec![1, 2, 3])],
1887            None,
1888            &[Encoding::PLAIN, Encoding::RLE],
1889            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1890        );
1891        check_encoding_write_support::<Int96Type>(
1892            WriterVersion::PARQUET_2_0,
1893            true,
1894            &[Int96::from(vec![1, 2, 3])],
1895            Some(0),
1896            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1897            &[
1898                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1899                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1900            ],
1901        );
1902        check_encoding_write_support::<Int96Type>(
1903            WriterVersion::PARQUET_2_0,
1904            false,
1905            &[Int96::from(vec![1, 2, 3])],
1906            None,
1907            &[Encoding::PLAIN, Encoding::RLE],
1908            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1909        );
1910    }
1911
1912    #[test]
1913    fn test_column_writer_default_encoding_support_float() {
1914        check_encoding_write_support::<FloatType>(
1915            WriterVersion::PARQUET_1_0,
1916            true,
1917            &[1.0, 2.0],
1918            Some(0),
1919            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1920            &[
1921                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1922                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1923            ],
1924        );
1925        check_encoding_write_support::<FloatType>(
1926            WriterVersion::PARQUET_1_0,
1927            false,
1928            &[1.0, 2.0],
1929            None,
1930            &[Encoding::PLAIN, Encoding::RLE],
1931            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1932        );
1933        check_encoding_write_support::<FloatType>(
1934            WriterVersion::PARQUET_2_0,
1935            true,
1936            &[1.0, 2.0],
1937            Some(0),
1938            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1939            &[
1940                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1941                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1942            ],
1943        );
1944        check_encoding_write_support::<FloatType>(
1945            WriterVersion::PARQUET_2_0,
1946            false,
1947            &[1.0, 2.0],
1948            None,
1949            &[Encoding::PLAIN, Encoding::RLE],
1950            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1951        );
1952    }
1953
1954    #[test]
1955    fn test_column_writer_default_encoding_support_double() {
1956        check_encoding_write_support::<DoubleType>(
1957            WriterVersion::PARQUET_1_0,
1958            true,
1959            &[1.0, 2.0],
1960            Some(0),
1961            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1962            &[
1963                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1964                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1965            ],
1966        );
1967        check_encoding_write_support::<DoubleType>(
1968            WriterVersion::PARQUET_1_0,
1969            false,
1970            &[1.0, 2.0],
1971            None,
1972            &[Encoding::PLAIN, Encoding::RLE],
1973            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1974        );
1975        check_encoding_write_support::<DoubleType>(
1976            WriterVersion::PARQUET_2_0,
1977            true,
1978            &[1.0, 2.0],
1979            Some(0),
1980            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1981            &[
1982                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1983                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1984            ],
1985        );
1986        check_encoding_write_support::<DoubleType>(
1987            WriterVersion::PARQUET_2_0,
1988            false,
1989            &[1.0, 2.0],
1990            None,
1991            &[Encoding::PLAIN, Encoding::RLE],
1992            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1993        );
1994    }
1995
1996    #[test]
1997    fn test_column_writer_default_encoding_support_byte_array() {
1998        check_encoding_write_support::<ByteArrayType>(
1999            WriterVersion::PARQUET_1_0,
2000            true,
2001            &[ByteArray::from(vec![1u8])],
2002            Some(0),
2003            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2004            &[
2005                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2006                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2007            ],
2008        );
2009        check_encoding_write_support::<ByteArrayType>(
2010            WriterVersion::PARQUET_1_0,
2011            false,
2012            &[ByteArray::from(vec![1u8])],
2013            None,
2014            &[Encoding::PLAIN, Encoding::RLE],
2015            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2016        );
2017        check_encoding_write_support::<ByteArrayType>(
2018            WriterVersion::PARQUET_2_0,
2019            true,
2020            &[ByteArray::from(vec![1u8])],
2021            Some(0),
2022            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2023            &[
2024                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2025                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2026            ],
2027        );
2028        check_encoding_write_support::<ByteArrayType>(
2029            WriterVersion::PARQUET_2_0,
2030            false,
2031            &[ByteArray::from(vec![1u8])],
2032            None,
2033            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2034            &[encoding_stats(
2035                PageType::DATA_PAGE_V2,
2036                Encoding::DELTA_BYTE_ARRAY,
2037                1,
2038            )],
2039        );
2040    }
2041
2042    #[test]
2043    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2044        check_encoding_write_support::<FixedLenByteArrayType>(
2045            WriterVersion::PARQUET_1_0,
2046            true,
2047            &[ByteArray::from(vec![1u8]).into()],
2048            None,
2049            &[Encoding::PLAIN, Encoding::RLE],
2050            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2051        );
2052        check_encoding_write_support::<FixedLenByteArrayType>(
2053            WriterVersion::PARQUET_1_0,
2054            false,
2055            &[ByteArray::from(vec![1u8]).into()],
2056            None,
2057            &[Encoding::PLAIN, Encoding::RLE],
2058            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2059        );
2060        check_encoding_write_support::<FixedLenByteArrayType>(
2061            WriterVersion::PARQUET_2_0,
2062            true,
2063            &[ByteArray::from(vec![1u8]).into()],
2064            Some(0),
2065            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2066            &[
2067                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2068                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2069            ],
2070        );
2071        check_encoding_write_support::<FixedLenByteArrayType>(
2072            WriterVersion::PARQUET_2_0,
2073            false,
2074            &[ByteArray::from(vec![1u8]).into()],
2075            None,
2076            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2077            &[encoding_stats(
2078                PageType::DATA_PAGE_V2,
2079                Encoding::DELTA_BYTE_ARRAY,
2080                1,
2081            )],
2082        );
2083    }
2084
2085    #[test]
2086    fn test_column_writer_check_metadata() {
2087        let page_writer = get_test_page_writer();
2088        let props = Default::default();
2089        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2090        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2091
2092        let r = writer.close().unwrap();
2093        assert_eq!(r.bytes_written, 20);
2094        assert_eq!(r.rows_written, 4);
2095
2096        let metadata = r.metadata;
2097        assert_eq!(
2098            metadata.encodings(),
2099            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2100        );
2101        assert_eq!(metadata.num_values(), 4);
2102        assert_eq!(metadata.compressed_size(), 20);
2103        assert_eq!(metadata.uncompressed_size(), 20);
2104        assert_eq!(metadata.data_page_offset(), 0);
2105        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2106        if let Some(stats) = metadata.statistics() {
2107            assert_eq!(stats.null_count_opt(), Some(0));
2108            assert_eq!(stats.distinct_count_opt(), None);
2109            if let Statistics::Int32(stats) = stats {
2110                assert_eq!(stats.min_opt().unwrap(), &1);
2111                assert_eq!(stats.max_opt().unwrap(), &4);
2112            } else {
2113                panic!("expecting Statistics::Int32");
2114            }
2115        } else {
2116            panic!("metadata missing statistics");
2117        }
2118    }
2119
2120    #[test]
2121    fn test_column_writer_check_byte_array_min_max() {
2122        let page_writer = get_test_page_writer();
2123        let props = Default::default();
2124        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2125        writer
2126            .write_batch(
2127                &[
2128                    ByteArray::from(vec![
2129                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2130                        35u8, 231u8, 90u8, 0u8, 0u8,
2131                    ]),
2132                    ByteArray::from(vec![
2133                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2134                        152u8, 177u8, 56u8, 0u8, 0u8,
2135                    ]),
2136                    ByteArray::from(vec![
2137                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2138                        0u8,
2139                    ]),
2140                    ByteArray::from(vec![
2141                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2142                        44u8, 0u8, 0u8,
2143                    ]),
2144                ],
2145                None,
2146                None,
2147            )
2148            .unwrap();
2149        let metadata = writer.close().unwrap().metadata;
2150        if let Some(stats) = metadata.statistics() {
2151            if let Statistics::ByteArray(stats) = stats {
2152                assert_eq!(
2153                    stats.min_opt().unwrap(),
2154                    &ByteArray::from(vec![
2155                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2156                        35u8, 231u8, 90u8, 0u8, 0u8,
2157                    ])
2158                );
2159                assert_eq!(
2160                    stats.max_opt().unwrap(),
2161                    &ByteArray::from(vec![
2162                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2163                        44u8, 0u8, 0u8,
2164                    ])
2165                );
2166            } else {
2167                panic!("expecting Statistics::ByteArray");
2168            }
2169        } else {
2170            panic!("metadata missing statistics");
2171        }
2172    }
2173
2174    #[test]
2175    fn test_column_writer_uint32_converted_type_min_max() {
2176        let page_writer = get_test_page_writer();
2177        let props = Default::default();
2178        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2179            page_writer,
2180            0,
2181            0,
2182            props,
2183        );
2184        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2185        let metadata = writer.close().unwrap().metadata;
2186        if let Some(stats) = metadata.statistics() {
2187            if let Statistics::Int32(stats) = stats {
2188                assert_eq!(stats.min_opt().unwrap(), &0,);
2189                assert_eq!(stats.max_opt().unwrap(), &5,);
2190            } else {
2191                panic!("expecting Statistics::Int32");
2192            }
2193        } else {
2194            panic!("metadata missing statistics");
2195        }
2196    }
2197
2198    #[test]
2199    fn test_column_writer_precalculated_statistics() {
2200        let page_writer = get_test_page_writer();
2201        let props = Arc::new(
2202            WriterProperties::builder()
2203                .set_statistics_enabled(EnabledStatistics::Chunk)
2204                .build(),
2205        );
2206        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2207        writer
2208            .write_batch_with_statistics(
2209                &[1, 2, 3, 4],
2210                None,
2211                None,
2212                Some(&-17),
2213                Some(&9000),
2214                Some(55),
2215            )
2216            .unwrap();
2217
2218        let r = writer.close().unwrap();
2219        assert_eq!(r.bytes_written, 20);
2220        assert_eq!(r.rows_written, 4);
2221
2222        let metadata = r.metadata;
2223        assert_eq!(
2224            metadata.encodings(),
2225            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2226        );
2227        assert_eq!(metadata.num_values(), 4);
2228        assert_eq!(metadata.compressed_size(), 20);
2229        assert_eq!(metadata.uncompressed_size(), 20);
2230        assert_eq!(metadata.data_page_offset(), 0);
2231        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2232        if let Some(stats) = metadata.statistics() {
2233            assert_eq!(stats.null_count_opt(), Some(0));
2234            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2235            if let Statistics::Int32(stats) = stats {
2236                assert_eq!(stats.min_opt().unwrap(), &-17);
2237                assert_eq!(stats.max_opt().unwrap(), &9000);
2238            } else {
2239                panic!("expecting Statistics::Int32");
2240            }
2241        } else {
2242            panic!("metadata missing statistics");
2243        }
2244    }
2245
2246    #[test]
2247    fn test_mixed_precomputed_statistics() {
2248        let mut buf = Vec::with_capacity(100);
2249        let mut write = TrackedWrite::new(&mut buf);
2250        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2251        let props = Arc::new(
2252            WriterProperties::builder()
2253                .set_write_page_header_statistics(true)
2254                .build(),
2255        );
2256        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2257
2258        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2259        writer
2260            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2261            .unwrap();
2262
2263        let r = writer.close().unwrap();
2264
2265        let stats = r.metadata.statistics().unwrap();
2266        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2267        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2268        assert_eq!(stats.null_count_opt(), Some(0));
2269        assert!(stats.distinct_count_opt().is_none());
2270
2271        drop(write);
2272
2273        let props = ReaderProperties::builder()
2274            .set_backward_compatible_lz4(false)
2275            .set_read_page_statistics(true)
2276            .build();
2277        let reader = SerializedPageReader::new_with_properties(
2278            Arc::new(Bytes::from(buf)),
2279            &r.metadata,
2280            r.rows_written as usize,
2281            None,
2282            Arc::new(props),
2283        )
2284        .unwrap();
2285
2286        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2287        assert_eq!(pages.len(), 2);
2288
2289        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2290        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2291
2292        let page_statistics = pages[1].statistics().unwrap();
2293        assert_eq!(
2294            page_statistics.min_bytes_opt().unwrap(),
2295            1_i32.to_le_bytes()
2296        );
2297        assert_eq!(
2298            page_statistics.max_bytes_opt().unwrap(),
2299            7_i32.to_le_bytes()
2300        );
2301        assert_eq!(page_statistics.null_count_opt(), Some(0));
2302        assert!(page_statistics.distinct_count_opt().is_none());
2303    }
2304
2305    #[test]
2306    fn test_disabled_statistics() {
2307        let mut buf = Vec::with_capacity(100);
2308        let mut write = TrackedWrite::new(&mut buf);
2309        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2310        let props = WriterProperties::builder()
2311            .set_statistics_enabled(EnabledStatistics::None)
2312            .set_writer_version(WriterVersion::PARQUET_2_0)
2313            .build();
2314        let props = Arc::new(props);
2315
2316        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2317        writer
2318            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2319            .unwrap();
2320
2321        let r = writer.close().unwrap();
2322        assert!(r.metadata.statistics().is_none());
2323
2324        drop(write);
2325
2326        let props = ReaderProperties::builder()
2327            .set_backward_compatible_lz4(false)
2328            .build();
2329        let reader = SerializedPageReader::new_with_properties(
2330            Arc::new(Bytes::from(buf)),
2331            &r.metadata,
2332            r.rows_written as usize,
2333            None,
2334            Arc::new(props),
2335        )
2336        .unwrap();
2337
2338        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2339        assert_eq!(pages.len(), 2);
2340
2341        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2342        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2343
2344        match &pages[1] {
2345            Page::DataPageV2 {
2346                num_values,
2347                num_nulls,
2348                num_rows,
2349                statistics,
2350                ..
2351            } => {
2352                assert_eq!(*num_values, 6);
2353                assert_eq!(*num_nulls, 2);
2354                assert_eq!(*num_rows, 6);
2355                assert!(statistics.is_none());
2356            }
2357            _ => unreachable!(),
2358        }
2359    }
2360
2361    #[test]
2362    fn test_column_writer_empty_column_roundtrip() {
2363        let props = Default::default();
2364        column_roundtrip::<Int32Type>(props, &[], None, None);
2365    }
2366
2367    #[test]
2368    fn test_column_writer_non_nullable_values_roundtrip() {
2369        let props = Default::default();
2370        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2371    }
2372
2373    #[test]
2374    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2375        let props = Default::default();
2376        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2377    }
2378
2379    #[test]
2380    fn test_column_writer_nullable_repeated_values_roundtrip() {
2381        let props = Default::default();
2382        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2383    }
2384
2385    #[test]
2386    fn test_column_writer_dictionary_fallback_small_data_page() {
2387        let props = WriterProperties::builder()
2388            .set_dictionary_page_size_limit(32)
2389            .set_data_page_size_limit(32)
2390            .build();
2391        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2392    }
2393
2394    #[test]
2395    fn test_column_writer_small_write_batch_size() {
2396        for i in &[1usize, 2, 5, 10, 11, 1023] {
2397            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2398
2399            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2400        }
2401    }
2402
2403    #[test]
2404    fn test_column_writer_dictionary_disabled_v1() {
2405        let props = WriterProperties::builder()
2406            .set_writer_version(WriterVersion::PARQUET_1_0)
2407            .set_dictionary_enabled(false)
2408            .build();
2409        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2410    }
2411
2412    #[test]
2413    fn test_column_writer_dictionary_disabled_v2() {
2414        let props = WriterProperties::builder()
2415            .set_writer_version(WriterVersion::PARQUET_2_0)
2416            .set_dictionary_enabled(false)
2417            .build();
2418        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2419    }
2420
2421    #[test]
2422    fn test_column_writer_compression_v1() {
2423        let props = WriterProperties::builder()
2424            .set_writer_version(WriterVersion::PARQUET_1_0)
2425            .set_compression(Compression::SNAPPY)
2426            .build();
2427        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2428    }
2429
2430    #[test]
2431    fn test_column_writer_compression_v2() {
2432        let props = WriterProperties::builder()
2433            .set_writer_version(WriterVersion::PARQUET_2_0)
2434            .set_compression(Compression::SNAPPY)
2435            .build();
2436        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2437    }
2438
2439    #[test]
2440    fn test_column_writer_add_data_pages_with_dict() {
2441        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2442        // and no fallback occurred so far.
2443        let mut file = tempfile::tempfile().unwrap();
2444        let mut write = TrackedWrite::new(&mut file);
2445        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2446        let props = Arc::new(
2447            WriterProperties::builder()
2448                .set_data_page_size_limit(10)
2449                .set_write_batch_size(3) // write 3 values at a time
2450                .build(),
2451        );
2452        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2453        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2454        writer.write_batch(data, None, None).unwrap();
2455        let r = writer.close().unwrap();
2456
2457        drop(write);
2458
2459        // Read pages and check the sequence
2460        let props = ReaderProperties::builder()
2461            .set_backward_compatible_lz4(false)
2462            .build();
2463        let mut page_reader = Box::new(
2464            SerializedPageReader::new_with_properties(
2465                Arc::new(file),
2466                &r.metadata,
2467                r.rows_written as usize,
2468                None,
2469                Arc::new(props),
2470            )
2471            .unwrap(),
2472        );
2473        let mut res = Vec::new();
2474        while let Some(page) = page_reader.get_next_page().unwrap() {
2475            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2476        }
2477        assert_eq!(
2478            res,
2479            vec![
2480                (PageType::DICTIONARY_PAGE, 10, 40),
2481                (PageType::DATA_PAGE, 9, 10),
2482                (PageType::DATA_PAGE, 1, 3),
2483            ]
2484        );
2485        assert_eq!(
2486            r.metadata.page_encoding_stats(),
2487            Some(&vec![
2488                PageEncodingStats {
2489                    page_type: PageType::DICTIONARY_PAGE,
2490                    encoding: Encoding::PLAIN,
2491                    count: 1
2492                },
2493                PageEncodingStats {
2494                    page_type: PageType::DATA_PAGE,
2495                    encoding: Encoding::RLE_DICTIONARY,
2496                    count: 2,
2497                }
2498            ])
2499        );
2500    }
2501
2502    #[test]
2503    fn test_bool_statistics() {
2504        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2505        // Booleans have an unsigned sort order and so are not compatible
2506        // with the deprecated `min` and `max` statistics
2507        assert!(!stats.is_min_max_backwards_compatible());
2508        if let Statistics::Boolean(stats) = stats {
2509            assert_eq!(stats.min_opt().unwrap(), &false);
2510            assert_eq!(stats.max_opt().unwrap(), &true);
2511        } else {
2512            panic!("expecting Statistics::Boolean, got {stats:?}");
2513        }
2514    }
2515
2516    #[test]
2517    fn test_int32_statistics() {
2518        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2519        assert!(stats.is_min_max_backwards_compatible());
2520        if let Statistics::Int32(stats) = stats {
2521            assert_eq!(stats.min_opt().unwrap(), &-2);
2522            assert_eq!(stats.max_opt().unwrap(), &3);
2523        } else {
2524            panic!("expecting Statistics::Int32, got {stats:?}");
2525        }
2526    }
2527
2528    #[test]
2529    fn test_int64_statistics() {
2530        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2531        assert!(stats.is_min_max_backwards_compatible());
2532        if let Statistics::Int64(stats) = stats {
2533            assert_eq!(stats.min_opt().unwrap(), &-2);
2534            assert_eq!(stats.max_opt().unwrap(), &3);
2535        } else {
2536            panic!("expecting Statistics::Int64, got {stats:?}");
2537        }
2538    }
2539
2540    #[test]
2541    fn test_int96_statistics() {
2542        let input = vec![
2543            Int96::from(vec![1, 20, 30]),
2544            Int96::from(vec![3, 20, 10]),
2545            Int96::from(vec![0, 20, 30]),
2546            Int96::from(vec![2, 20, 30]),
2547        ]
2548        .into_iter()
2549        .collect::<Vec<Int96>>();
2550
2551        let stats = statistics_roundtrip::<Int96Type>(&input);
2552        assert!(!stats.is_min_max_backwards_compatible());
2553        if let Statistics::Int96(stats) = stats {
2554            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2555            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2556        } else {
2557            panic!("expecting Statistics::Int96, got {stats:?}");
2558        }
2559    }
2560
2561    #[test]
2562    fn test_float_statistics() {
2563        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2564        assert!(stats.is_min_max_backwards_compatible());
2565        if let Statistics::Float(stats) = stats {
2566            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2567            assert_eq!(stats.max_opt().unwrap(), &3.0);
2568        } else {
2569            panic!("expecting Statistics::Float, got {stats:?}");
2570        }
2571    }
2572
2573    #[test]
2574    fn test_double_statistics() {
2575        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2576        assert!(stats.is_min_max_backwards_compatible());
2577        if let Statistics::Double(stats) = stats {
2578            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2579            assert_eq!(stats.max_opt().unwrap(), &3.0);
2580        } else {
2581            panic!("expecting Statistics::Double, got {stats:?}");
2582        }
2583    }
2584
2585    #[test]
2586    fn test_byte_array_statistics() {
2587        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2588            .iter()
2589            .map(|&s| s.into())
2590            .collect::<Vec<_>>();
2591
2592        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2593        assert!(!stats.is_min_max_backwards_compatible());
2594        if let Statistics::ByteArray(stats) = stats {
2595            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2596            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2597        } else {
2598            panic!("expecting Statistics::ByteArray, got {stats:?}");
2599        }
2600    }
2601
2602    #[test]
2603    fn test_fixed_len_byte_array_statistics() {
2604        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2605            .iter()
2606            .map(|&s| ByteArray::from(s).into())
2607            .collect::<Vec<_>>();
2608
2609        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2610        assert!(!stats.is_min_max_backwards_compatible());
2611        if let Statistics::FixedLenByteArray(stats) = stats {
2612            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2613            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2614            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2615            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2616        } else {
2617            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2618        }
2619    }
2620
2621    #[test]
2622    fn test_column_writer_check_float16_min_max() {
2623        let input = [
2624            -f16::ONE,
2625            f16::from_f32(3.0),
2626            -f16::from_f32(2.0),
2627            f16::from_f32(2.0),
2628        ]
2629        .into_iter()
2630        .map(|s| ByteArray::from(s).into())
2631        .collect::<Vec<_>>();
2632
2633        let stats = float16_statistics_roundtrip(&input);
2634        assert!(stats.is_min_max_backwards_compatible());
2635        assert_eq!(
2636            stats.min_opt().unwrap(),
2637            &ByteArray::from(-f16::from_f32(2.0))
2638        );
2639        assert_eq!(
2640            stats.max_opt().unwrap(),
2641            &ByteArray::from(f16::from_f32(3.0))
2642        );
2643    }
2644
2645    #[test]
2646    fn test_column_writer_check_float16_nan_middle() {
2647        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2648            .into_iter()
2649            .map(|s| ByteArray::from(s).into())
2650            .collect::<Vec<_>>();
2651
2652        let stats = float16_statistics_roundtrip(&input);
2653        assert!(stats.is_min_max_backwards_compatible());
2654        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2655        assert_eq!(
2656            stats.max_opt().unwrap(),
2657            &ByteArray::from(f16::ONE + f16::ONE)
2658        );
2659    }
2660
2661    #[test]
2662    fn test_float16_statistics_nan_middle() {
2663        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2664            .into_iter()
2665            .map(|s| ByteArray::from(s).into())
2666            .collect::<Vec<_>>();
2667
2668        let stats = float16_statistics_roundtrip(&input);
2669        assert!(stats.is_min_max_backwards_compatible());
2670        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2671        assert_eq!(
2672            stats.max_opt().unwrap(),
2673            &ByteArray::from(f16::ONE + f16::ONE)
2674        );
2675    }
2676
2677    #[test]
2678    fn test_float16_statistics_nan_start() {
2679        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2680            .into_iter()
2681            .map(|s| ByteArray::from(s).into())
2682            .collect::<Vec<_>>();
2683
2684        let stats = float16_statistics_roundtrip(&input);
2685        assert!(stats.is_min_max_backwards_compatible());
2686        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2687        assert_eq!(
2688            stats.max_opt().unwrap(),
2689            &ByteArray::from(f16::ONE + f16::ONE)
2690        );
2691    }
2692
2693    #[test]
2694    fn test_float16_statistics_nan_only() {
2695        let input = [f16::NAN, f16::NAN]
2696            .into_iter()
2697            .map(|s| ByteArray::from(s).into())
2698            .collect::<Vec<_>>();
2699
2700        let stats = float16_statistics_roundtrip(&input);
2701        assert!(stats.min_bytes_opt().is_none());
2702        assert!(stats.max_bytes_opt().is_none());
2703        assert!(stats.is_min_max_backwards_compatible());
2704    }
2705
2706    #[test]
2707    fn test_float16_statistics_zero_only() {
2708        let input = [f16::ZERO]
2709            .into_iter()
2710            .map(|s| ByteArray::from(s).into())
2711            .collect::<Vec<_>>();
2712
2713        let stats = float16_statistics_roundtrip(&input);
2714        assert!(stats.is_min_max_backwards_compatible());
2715        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2716        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2717    }
2718
2719    #[test]
2720    fn test_float16_statistics_neg_zero_only() {
2721        let input = [f16::NEG_ZERO]
2722            .into_iter()
2723            .map(|s| ByteArray::from(s).into())
2724            .collect::<Vec<_>>();
2725
2726        let stats = float16_statistics_roundtrip(&input);
2727        assert!(stats.is_min_max_backwards_compatible());
2728        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2729        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2730    }
2731
2732    #[test]
2733    fn test_float16_statistics_zero_min() {
2734        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2735            .into_iter()
2736            .map(|s| ByteArray::from(s).into())
2737            .collect::<Vec<_>>();
2738
2739        let stats = float16_statistics_roundtrip(&input);
2740        assert!(stats.is_min_max_backwards_compatible());
2741        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2742        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2743    }
2744
2745    #[test]
2746    fn test_float16_statistics_neg_zero_max() {
2747        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2748            .into_iter()
2749            .map(|s| ByteArray::from(s).into())
2750            .collect::<Vec<_>>();
2751
2752        let stats = float16_statistics_roundtrip(&input);
2753        assert!(stats.is_min_max_backwards_compatible());
2754        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2755        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2756    }
2757
2758    #[test]
2759    fn test_float_statistics_nan_middle() {
2760        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2761        assert!(stats.is_min_max_backwards_compatible());
2762        if let Statistics::Float(stats) = stats {
2763            assert_eq!(stats.min_opt().unwrap(), &1.0);
2764            assert_eq!(stats.max_opt().unwrap(), &2.0);
2765        } else {
2766            panic!("expecting Statistics::Float");
2767        }
2768    }
2769
2770    #[test]
2771    fn test_float_statistics_nan_start() {
2772        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2773        assert!(stats.is_min_max_backwards_compatible());
2774        if let Statistics::Float(stats) = stats {
2775            assert_eq!(stats.min_opt().unwrap(), &1.0);
2776            assert_eq!(stats.max_opt().unwrap(), &2.0);
2777        } else {
2778            panic!("expecting Statistics::Float");
2779        }
2780    }
2781
2782    #[test]
2783    fn test_float_statistics_nan_only() {
2784        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2785        assert!(stats.min_bytes_opt().is_none());
2786        assert!(stats.max_bytes_opt().is_none());
2787        assert!(stats.is_min_max_backwards_compatible());
2788        assert!(matches!(stats, Statistics::Float(_)));
2789    }
2790
2791    #[test]
2792    fn test_float_statistics_zero_only() {
2793        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2794        assert!(stats.is_min_max_backwards_compatible());
2795        if let Statistics::Float(stats) = stats {
2796            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2797            assert!(stats.min_opt().unwrap().is_sign_negative());
2798            assert_eq!(stats.max_opt().unwrap(), &0.0);
2799            assert!(stats.max_opt().unwrap().is_sign_positive());
2800        } else {
2801            panic!("expecting Statistics::Float");
2802        }
2803    }
2804
2805    #[test]
2806    fn test_float_statistics_neg_zero_only() {
2807        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2808        assert!(stats.is_min_max_backwards_compatible());
2809        if let Statistics::Float(stats) = stats {
2810            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2811            assert!(stats.min_opt().unwrap().is_sign_negative());
2812            assert_eq!(stats.max_opt().unwrap(), &0.0);
2813            assert!(stats.max_opt().unwrap().is_sign_positive());
2814        } else {
2815            panic!("expecting Statistics::Float");
2816        }
2817    }
2818
2819    #[test]
2820    fn test_float_statistics_zero_min() {
2821        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2822        assert!(stats.is_min_max_backwards_compatible());
2823        if let Statistics::Float(stats) = stats {
2824            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2825            assert!(stats.min_opt().unwrap().is_sign_negative());
2826            assert_eq!(stats.max_opt().unwrap(), &2.0);
2827        } else {
2828            panic!("expecting Statistics::Float");
2829        }
2830    }
2831
2832    #[test]
2833    fn test_float_statistics_neg_zero_max() {
2834        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2835        assert!(stats.is_min_max_backwards_compatible());
2836        if let Statistics::Float(stats) = stats {
2837            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2838            assert_eq!(stats.max_opt().unwrap(), &0.0);
2839            assert!(stats.max_opt().unwrap().is_sign_positive());
2840        } else {
2841            panic!("expecting Statistics::Float");
2842        }
2843    }
2844
2845    #[test]
2846    fn test_double_statistics_nan_middle() {
2847        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2848        assert!(stats.is_min_max_backwards_compatible());
2849        if let Statistics::Double(stats) = stats {
2850            assert_eq!(stats.min_opt().unwrap(), &1.0);
2851            assert_eq!(stats.max_opt().unwrap(), &2.0);
2852        } else {
2853            panic!("expecting Statistics::Double");
2854        }
2855    }
2856
2857    #[test]
2858    fn test_double_statistics_nan_start() {
2859        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2860        assert!(stats.is_min_max_backwards_compatible());
2861        if let Statistics::Double(stats) = stats {
2862            assert_eq!(stats.min_opt().unwrap(), &1.0);
2863            assert_eq!(stats.max_opt().unwrap(), &2.0);
2864        } else {
2865            panic!("expecting Statistics::Double");
2866        }
2867    }
2868
2869    #[test]
2870    fn test_double_statistics_nan_only() {
2871        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2872        assert!(stats.min_bytes_opt().is_none());
2873        assert!(stats.max_bytes_opt().is_none());
2874        assert!(matches!(stats, Statistics::Double(_)));
2875        assert!(stats.is_min_max_backwards_compatible());
2876    }
2877
2878    #[test]
2879    fn test_double_statistics_zero_only() {
2880        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2881        assert!(stats.is_min_max_backwards_compatible());
2882        if let Statistics::Double(stats) = stats {
2883            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2884            assert!(stats.min_opt().unwrap().is_sign_negative());
2885            assert_eq!(stats.max_opt().unwrap(), &0.0);
2886            assert!(stats.max_opt().unwrap().is_sign_positive());
2887        } else {
2888            panic!("expecting Statistics::Double");
2889        }
2890    }
2891
2892    #[test]
2893    fn test_double_statistics_neg_zero_only() {
2894        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2895        assert!(stats.is_min_max_backwards_compatible());
2896        if let Statistics::Double(stats) = stats {
2897            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2898            assert!(stats.min_opt().unwrap().is_sign_negative());
2899            assert_eq!(stats.max_opt().unwrap(), &0.0);
2900            assert!(stats.max_opt().unwrap().is_sign_positive());
2901        } else {
2902            panic!("expecting Statistics::Double");
2903        }
2904    }
2905
2906    #[test]
2907    fn test_double_statistics_zero_min() {
2908        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2909        assert!(stats.is_min_max_backwards_compatible());
2910        if let Statistics::Double(stats) = stats {
2911            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2912            assert!(stats.min_opt().unwrap().is_sign_negative());
2913            assert_eq!(stats.max_opt().unwrap(), &2.0);
2914        } else {
2915            panic!("expecting Statistics::Double");
2916        }
2917    }
2918
2919    #[test]
2920    fn test_double_statistics_neg_zero_max() {
2921        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2922        assert!(stats.is_min_max_backwards_compatible());
2923        if let Statistics::Double(stats) = stats {
2924            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2925            assert_eq!(stats.max_opt().unwrap(), &0.0);
2926            assert!(stats.max_opt().unwrap().is_sign_positive());
2927        } else {
2928            panic!("expecting Statistics::Double");
2929        }
2930    }
2931
2932    #[test]
2933    fn test_compare_greater_byte_array_decimals() {
2934        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2935        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2936        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2937        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2938        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2939        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2940        assert!(!compare_greater_byte_array_decimals(
2941            &[0u8, 1u8,],
2942            &[1u8, 0u8,],
2943        ),);
2944        assert!(!compare_greater_byte_array_decimals(
2945            &[255u8, 35u8, 0u8, 0u8,],
2946            &[0u8,],
2947        ),);
2948        assert!(compare_greater_byte_array_decimals(
2949            &[0u8,],
2950            &[255u8, 35u8, 0u8, 0u8,],
2951        ),);
2952    }
2953
2954    #[test]
2955    fn test_column_index_with_null_pages() {
2956        // write a single page of all nulls
2957        let page_writer = get_test_page_writer();
2958        let props = Default::default();
2959        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2960        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2961
2962        let r = writer.close().unwrap();
2963        assert!(r.column_index.is_some());
2964        let col_idx = r.column_index.unwrap();
2965        let col_idx = match col_idx {
2966            ColumnIndexMetaData::INT32(col_idx) => col_idx,
2967            _ => panic!("wrong stats type"),
2968        };
2969        // null_pages should be true for page 0
2970        assert!(col_idx.is_null_page(0));
2971        // min and max should be empty byte arrays
2972        assert!(col_idx.min_value(0).is_none());
2973        assert!(col_idx.max_value(0).is_none());
2974        // null_counts should be defined and be 4 for page 0
2975        assert!(col_idx.null_count(0).is_some());
2976        assert_eq!(col_idx.null_count(0), Some(4));
2977        // there is no repetition so rep histogram should be absent
2978        assert!(col_idx.repetition_level_histogram(0).is_none());
2979        // definition_level_histogram should be present and should be 0:4, 1:0
2980        assert!(col_idx.definition_level_histogram(0).is_some());
2981        assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
2982    }
2983
2984    #[test]
2985    fn test_column_offset_index_metadata() {
2986        // write data
2987        // and check the offset index and column index
2988        let page_writer = get_test_page_writer();
2989        let props = Default::default();
2990        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2991        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2992        // first page
2993        writer.flush_data_pages().unwrap();
2994        // second page
2995        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2996
2997        let r = writer.close().unwrap();
2998        let column_index = r.column_index.unwrap();
2999        let offset_index = r.offset_index.unwrap();
3000
3001        assert_eq!(8, r.rows_written);
3002
3003        // column index
3004        let column_index = match column_index {
3005            ColumnIndexMetaData::INT32(column_index) => column_index,
3006            _ => panic!("wrong stats type"),
3007        };
3008        assert_eq!(2, column_index.num_pages());
3009        assert_eq!(2, offset_index.page_locations.len());
3010        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3011        for idx in 0..2 {
3012            assert!(!column_index.is_null_page(idx));
3013            assert_eq!(0, column_index.null_count(0).unwrap());
3014        }
3015
3016        if let Some(stats) = r.metadata.statistics() {
3017            assert_eq!(stats.null_count_opt(), Some(0));
3018            assert_eq!(stats.distinct_count_opt(), None);
3019            if let Statistics::Int32(stats) = stats {
3020                // first page is [1,2,3,4]
3021                // second page is [-5,2,4,8]
3022                // note that we don't increment here, as this is a non BinaryArray type.
3023                assert_eq!(stats.min_opt(), column_index.min_value(1));
3024                assert_eq!(stats.max_opt(), column_index.max_value(1));
3025            } else {
3026                panic!("expecting Statistics::Int32");
3027            }
3028        } else {
3029            panic!("metadata missing statistics");
3030        }
3031
3032        // page location
3033        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3034        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3035    }
3036
3037    /// Verify min/max value truncation in the column index works as expected
3038    #[test]
3039    fn test_column_offset_index_metadata_truncating() {
3040        // write data
3041        // and check the offset index and column index
3042        let page_writer = get_test_page_writer();
3043        let props = WriterProperties::builder()
3044            .set_statistics_truncate_length(None) // disable column index truncation
3045            .build()
3046            .into();
3047        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3048
3049        let mut data = vec![FixedLenByteArray::default(); 3];
3050        // This is the expected min value - "aaa..."
3051        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3052        // This is the expected max value - "ZZZ..."
3053        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3054        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3055
3056        writer.write_batch(&data, None, None).unwrap();
3057
3058        writer.flush_data_pages().unwrap();
3059
3060        let r = writer.close().unwrap();
3061        let column_index = r.column_index.unwrap();
3062        let offset_index = r.offset_index.unwrap();
3063
3064        let column_index = match column_index {
3065            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3066            _ => panic!("wrong stats type"),
3067        };
3068
3069        assert_eq!(3, r.rows_written);
3070
3071        // column index
3072        assert_eq!(1, column_index.num_pages());
3073        assert_eq!(1, offset_index.page_locations.len());
3074        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3075        assert!(!column_index.is_null_page(0));
3076        assert_eq!(Some(0), column_index.null_count(0));
3077
3078        if let Some(stats) = r.metadata.statistics() {
3079            assert_eq!(stats.null_count_opt(), Some(0));
3080            assert_eq!(stats.distinct_count_opt(), None);
3081            if let Statistics::FixedLenByteArray(stats) = stats {
3082                let column_index_min_value = column_index.min_value(0).unwrap();
3083                let column_index_max_value = column_index.max_value(0).unwrap();
3084
3085                // Column index stats are truncated, while the column chunk's aren't.
3086                assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3087                assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3088
3089                assert_eq!(
3090                    column_index_min_value.len(),
3091                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3092                );
3093                assert_eq!(column_index_min_value, &[97_u8; 64]);
3094                assert_eq!(
3095                    column_index_max_value.len(),
3096                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3097                );
3098
3099                // We expect the last byte to be incremented
3100                assert_eq!(
3101                    *column_index_max_value.last().unwrap(),
3102                    *column_index_max_value.first().unwrap() + 1
3103                );
3104            } else {
3105                panic!("expecting Statistics::FixedLenByteArray");
3106            }
3107        } else {
3108            panic!("metadata missing statistics");
3109        }
3110    }
3111
3112    #[test]
3113    fn test_column_offset_index_truncating_spec_example() {
3114        // write data
3115        // and check the offset index and column index
3116        let page_writer = get_test_page_writer();
3117
3118        // Truncate values at 1 byte
3119        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3120        let props = Arc::new(builder.build());
3121        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3122
3123        let mut data = vec![FixedLenByteArray::default(); 1];
3124        // This is the expected min value
3125        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3126
3127        writer.write_batch(&data, None, None).unwrap();
3128
3129        writer.flush_data_pages().unwrap();
3130
3131        let r = writer.close().unwrap();
3132        let column_index = r.column_index.unwrap();
3133        let offset_index = r.offset_index.unwrap();
3134
3135        let column_index = match column_index {
3136            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3137            _ => panic!("wrong stats type"),
3138        };
3139
3140        assert_eq!(1, r.rows_written);
3141
3142        // column index
3143        assert_eq!(1, column_index.num_pages());
3144        assert_eq!(1, offset_index.page_locations.len());
3145        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3146        assert!(!column_index.is_null_page(0));
3147        assert_eq!(Some(0), column_index.null_count(0));
3148
3149        if let Some(stats) = r.metadata.statistics() {
3150            assert_eq!(stats.null_count_opt(), Some(0));
3151            assert_eq!(stats.distinct_count_opt(), None);
3152            if let Statistics::FixedLenByteArray(_stats) = stats {
3153                let column_index_min_value = column_index.min_value(0).unwrap();
3154                let column_index_max_value = column_index.max_value(0).unwrap();
3155
3156                assert_eq!(column_index_min_value.len(), 1);
3157                assert_eq!(column_index_max_value.len(), 1);
3158
3159                assert_eq!("B".as_bytes(), column_index_min_value);
3160                assert_eq!("C".as_bytes(), column_index_max_value);
3161
3162                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3163                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3164            } else {
3165                panic!("expecting Statistics::FixedLenByteArray");
3166            }
3167        } else {
3168            panic!("metadata missing statistics");
3169        }
3170    }
3171
3172    #[test]
3173    fn test_float16_min_max_no_truncation() {
3174        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3175        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3176        let props = Arc::new(builder.build());
3177        let page_writer = get_test_page_writer();
3178        let mut writer = get_test_float16_column_writer(page_writer, props);
3179
3180        let expected_value = f16::PI.to_le_bytes().to_vec();
3181        let data = vec![ByteArray::from(expected_value.clone()).into()];
3182        writer.write_batch(&data, None, None).unwrap();
3183        writer.flush_data_pages().unwrap();
3184
3185        let r = writer.close().unwrap();
3186
3187        // stats should still be written
3188        // ensure bytes weren't truncated for column index
3189        let column_index = r.column_index.unwrap();
3190        let column_index = match column_index {
3191            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3192            _ => panic!("wrong stats type"),
3193        };
3194        let column_index_min_bytes = column_index.min_value(0).unwrap();
3195        let column_index_max_bytes = column_index.max_value(0).unwrap();
3196        assert_eq!(expected_value, column_index_min_bytes);
3197        assert_eq!(expected_value, column_index_max_bytes);
3198
3199        // ensure bytes weren't truncated for statistics
3200        let stats = r.metadata.statistics().unwrap();
3201        if let Statistics::FixedLenByteArray(stats) = stats {
3202            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3203            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3204            assert_eq!(expected_value, stats_min_bytes);
3205            assert_eq!(expected_value, stats_max_bytes);
3206        } else {
3207            panic!("expecting Statistics::FixedLenByteArray");
3208        }
3209    }
3210
3211    #[test]
3212    fn test_decimal_min_max_no_truncation() {
3213        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3214        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3215        let props = Arc::new(builder.build());
3216        let page_writer = get_test_page_writer();
3217        let mut writer =
3218            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3219
3220        let expected_value = vec![
3221            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3222            231u8, 90u8, 0u8, 0u8,
3223        ];
3224        let data = vec![ByteArray::from(expected_value.clone()).into()];
3225        writer.write_batch(&data, None, None).unwrap();
3226        writer.flush_data_pages().unwrap();
3227
3228        let r = writer.close().unwrap();
3229
3230        // stats should still be written
3231        // ensure bytes weren't truncated for column index
3232        let column_index = r.column_index.unwrap();
3233        let column_index = match column_index {
3234            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3235            _ => panic!("wrong stats type"),
3236        };
3237        let column_index_min_bytes = column_index.min_value(0).unwrap();
3238        let column_index_max_bytes = column_index.max_value(0).unwrap();
3239        assert_eq!(expected_value, column_index_min_bytes);
3240        assert_eq!(expected_value, column_index_max_bytes);
3241
3242        // ensure bytes weren't truncated for statistics
3243        let stats = r.metadata.statistics().unwrap();
3244        if let Statistics::FixedLenByteArray(stats) = stats {
3245            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3246            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3247            assert_eq!(expected_value, stats_min_bytes);
3248            assert_eq!(expected_value, stats_max_bytes);
3249        } else {
3250            panic!("expecting Statistics::FixedLenByteArray");
3251        }
3252    }
3253
3254    #[test]
3255    fn test_statistics_truncating_byte_array_default() {
3256        let page_writer = get_test_page_writer();
3257
3258        // The default truncate length is 64 bytes
3259        let props = WriterProperties::builder().build().into();
3260        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3261
3262        let mut data = vec![ByteArray::default(); 1];
3263        data[0].set_data(Bytes::from(String::from(
3264            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3265        )));
3266        writer.write_batch(&data, None, None).unwrap();
3267        writer.flush_data_pages().unwrap();
3268
3269        let r = writer.close().unwrap();
3270
3271        assert_eq!(1, r.rows_written);
3272
3273        let stats = r.metadata.statistics().expect("statistics");
3274        if let Statistics::ByteArray(_stats) = stats {
3275            let min_value = _stats.min_opt().unwrap();
3276            let max_value = _stats.max_opt().unwrap();
3277
3278            assert!(!_stats.min_is_exact());
3279            assert!(!_stats.max_is_exact());
3280
3281            let expected_len = 64;
3282            assert_eq!(min_value.len(), expected_len);
3283            assert_eq!(max_value.len(), expected_len);
3284
3285            let expected_min =
3286                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3287            assert_eq!(expected_min, min_value.as_bytes());
3288            // note the max value is different from the min value: the last byte is incremented
3289            let expected_max =
3290                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3291            assert_eq!(expected_max, max_value.as_bytes());
3292        } else {
3293            panic!("expecting Statistics::ByteArray");
3294        }
3295    }
3296
3297    #[test]
3298    fn test_statistics_truncating_byte_array() {
3299        let page_writer = get_test_page_writer();
3300
3301        const TEST_TRUNCATE_LENGTH: usize = 1;
3302
3303        // Truncate values at 1 byte
3304        let builder =
3305            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3306        let props = Arc::new(builder.build());
3307        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3308
3309        let mut data = vec![ByteArray::default(); 1];
3310        // This is the expected min value
3311        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3312
3313        writer.write_batch(&data, None, None).unwrap();
3314
3315        writer.flush_data_pages().unwrap();
3316
3317        let r = writer.close().unwrap();
3318
3319        assert_eq!(1, r.rows_written);
3320
3321        let stats = r.metadata.statistics().expect("statistics");
3322        assert_eq!(stats.null_count_opt(), Some(0));
3323        assert_eq!(stats.distinct_count_opt(), None);
3324        if let Statistics::ByteArray(_stats) = stats {
3325            let min_value = _stats.min_opt().unwrap();
3326            let max_value = _stats.max_opt().unwrap();
3327
3328            assert!(!_stats.min_is_exact());
3329            assert!(!_stats.max_is_exact());
3330
3331            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3332            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3333
3334            assert_eq!("B".as_bytes(), min_value.as_bytes());
3335            assert_eq!("C".as_bytes(), max_value.as_bytes());
3336        } else {
3337            panic!("expecting Statistics::ByteArray");
3338        }
3339    }
3340
3341    #[test]
3342    fn test_statistics_truncating_fixed_len_byte_array() {
3343        let page_writer = get_test_page_writer();
3344
3345        const TEST_TRUNCATE_LENGTH: usize = 1;
3346
3347        // Truncate values at 1 byte
3348        let builder =
3349            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3350        let props = Arc::new(builder.build());
3351        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3352
3353        let mut data = vec![FixedLenByteArray::default(); 1];
3354
3355        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3356        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3357
3358        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3359        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3360            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3361
3362        // This is the expected min value
3363        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3364
3365        writer.write_batch(&data, None, None).unwrap();
3366
3367        writer.flush_data_pages().unwrap();
3368
3369        let r = writer.close().unwrap();
3370
3371        assert_eq!(1, r.rows_written);
3372
3373        let stats = r.metadata.statistics().expect("statistics");
3374        assert_eq!(stats.null_count_opt(), Some(0));
3375        assert_eq!(stats.distinct_count_opt(), None);
3376        if let Statistics::FixedLenByteArray(_stats) = stats {
3377            let min_value = _stats.min_opt().unwrap();
3378            let max_value = _stats.max_opt().unwrap();
3379
3380            assert!(!_stats.min_is_exact());
3381            assert!(!_stats.max_is_exact());
3382
3383            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3384            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3385
3386            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3387            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3388
3389            let reconstructed_min = i128::from_be_bytes([
3390                min_value.as_bytes()[0],
3391                0,
3392                0,
3393                0,
3394                0,
3395                0,
3396                0,
3397                0,
3398                0,
3399                0,
3400                0,
3401                0,
3402                0,
3403                0,
3404                0,
3405                0,
3406            ]);
3407
3408            let reconstructed_max = i128::from_be_bytes([
3409                max_value.as_bytes()[0],
3410                0,
3411                0,
3412                0,
3413                0,
3414                0,
3415                0,
3416                0,
3417                0,
3418                0,
3419                0,
3420                0,
3421                0,
3422                0,
3423                0,
3424                0,
3425            ]);
3426
3427            // check that the inner value is correctly bounded by the min/max
3428            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3429            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3430            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3431            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3432        } else {
3433            panic!("expecting Statistics::FixedLenByteArray");
3434        }
3435    }
3436
3437    #[test]
3438    fn test_send() {
3439        fn test<T: Send>() {}
3440        test::<ColumnWriterImpl<Int32Type>>();
3441    }
3442
3443    #[test]
3444    fn test_increment() {
3445        let v = increment(vec![0, 0, 0]).unwrap();
3446        assert_eq!(&v, &[0, 0, 1]);
3447
3448        // Handle overflow
3449        let v = increment(vec![0, 255, 255]).unwrap();
3450        assert_eq!(&v, &[1, 0, 0]);
3451
3452        // Return `None` if all bytes are u8::MAX
3453        let v = increment(vec![255, 255, 255]);
3454        assert!(v.is_none());
3455    }
3456
3457    #[test]
3458    fn test_increment_utf8() {
3459        let test_inc = |o: &str, expected: &str| {
3460            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3461                // Got the expected result...
3462                assert_eq!(v, expected);
3463                // and it's greater than the original string
3464                assert!(*v > *o);
3465                // Also show that BinaryArray level comparison works here
3466                let mut greater = ByteArray::new();
3467                greater.set_data(Bytes::from(v));
3468                let mut original = ByteArray::new();
3469                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3470                assert!(greater > original);
3471            } else {
3472                panic!("Expected incremented UTF8 string to also be valid.");
3473            }
3474        };
3475
3476        // Basic ASCII case
3477        test_inc("hello", "hellp");
3478
3479        // 1-byte ending in max 1-byte
3480        test_inc("a\u{7f}", "b");
3481
3482        // 1-byte max should not truncate as it would need 2-byte code points
3483        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3484
3485        // UTF8 string
3486        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3487
3488        // 2-byte without overflow
3489        test_inc("éééé", "éééê");
3490
3491        // 2-byte that overflows lowest byte
3492        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3493
3494        // 2-byte ending in max 2-byte
3495        test_inc("a\u{7ff}", "b");
3496
3497        // Max 2-byte should not truncate as it would need 3-byte code points
3498        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3499
3500        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3501        // characters should render right to left).
3502        test_inc("ࠀࠀ", "ࠀࠁ");
3503
3504        // 3-byte ending in max 3-byte
3505        test_inc("a\u{ffff}", "b");
3506
3507        // Max 3-byte should not truncate as it would need 4-byte code points
3508        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3509
3510        // 4-byte without overflow
3511        test_inc("𐀀𐀀", "𐀀𐀁");
3512
3513        // 4-byte ending in max unicode
3514        test_inc("a\u{10ffff}", "b");
3515
3516        // Max 4-byte should not truncate
3517        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3518
3519        // Skip over surrogate pair range (0xD800..=0xDFFF)
3520        //test_inc("a\u{D7FF}", "a\u{e000}");
3521        test_inc("a\u{D7FF}", "b");
3522    }
3523
3524    #[test]
3525    fn test_truncate_utf8() {
3526        // No-op
3527        let data = "❤️🧡💛💚💙💜";
3528        let r = truncate_utf8(data, data.len()).unwrap();
3529        assert_eq!(r.len(), data.len());
3530        assert_eq!(&r, data.as_bytes());
3531
3532        // We slice it away from the UTF8 boundary
3533        let r = truncate_utf8(data, 13).unwrap();
3534        assert_eq!(r.len(), 10);
3535        assert_eq!(&r, "❤️🧡".as_bytes());
3536
3537        // One multi-byte code point, and a length shorter than it, so we can't slice it
3538        let r = truncate_utf8("\u{0836}", 1);
3539        assert!(r.is_none());
3540
3541        // Test truncate and increment for max bounds on UTF-8 statistics
3542        // 7-bit (i.e. ASCII)
3543        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3544        assert_eq!(&r, "yyyyyyyz".as_bytes());
3545
3546        // 2-byte without overflow
3547        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3548        assert_eq!(&r, "ééê".as_bytes());
3549
3550        // 2-byte that overflows lowest byte
3551        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3552        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3553
3554        // max 2-byte should not truncate as it would need 3-byte code points
3555        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3556        assert!(r.is_none());
3557
3558        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3559        // characters should render right to left).
3560        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3561        assert_eq!(&r, "ࠀࠁ".as_bytes());
3562
3563        // max 3-byte should not truncate as it would need 4-byte code points
3564        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3565        assert!(r.is_none());
3566
3567        // 4-byte without overflow
3568        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3569        assert_eq!(&r, "𐀀𐀁".as_bytes());
3570
3571        // max 4-byte should not truncate
3572        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3573        assert!(r.is_none());
3574    }
3575
3576    #[test]
3577    // Check fallback truncation of statistics that should be UTF-8, but aren't
3578    // (see https://github.com/apache/arrow-rs/pull/6870).
3579    fn test_byte_array_truncate_invalid_utf8_statistics() {
3580        let message_type = "
3581            message test_schema {
3582                OPTIONAL BYTE_ARRAY a (UTF8);
3583            }
3584        ";
3585        let schema = Arc::new(parse_message_type(message_type).unwrap());
3586
3587        // Create Vec<ByteArray> containing non-UTF8 bytes
3588        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3589        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3590        let file: File = tempfile::tempfile().unwrap();
3591        let props = Arc::new(
3592            WriterProperties::builder()
3593                .set_statistics_enabled(EnabledStatistics::Chunk)
3594                .set_statistics_truncate_length(Some(8))
3595                .build(),
3596        );
3597
3598        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3599        let mut row_group_writer = writer.next_row_group().unwrap();
3600
3601        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3602        col_writer
3603            .typed::<ByteArrayType>()
3604            .write_batch(&data, Some(&def_levels), None)
3605            .unwrap();
3606        col_writer.close().unwrap();
3607        row_group_writer.close().unwrap();
3608        let file_metadata = writer.close().unwrap();
3609        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3610        assert!(!stats.max_is_exact());
3611        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3612        // be incremented by 1.
3613        assert_eq!(
3614            stats.max_bytes_opt().map(|v| v.to_vec()),
3615            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3616        );
3617    }
3618
3619    #[test]
3620    fn test_increment_max_binary_chars() {
3621        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3622        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3623
3624        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3625        assert!(incremented.is_none())
3626    }
3627
3628    #[test]
3629    fn test_no_column_index_when_stats_disabled() {
3630        // https://github.com/apache/arrow-rs/issues/6010
3631        // Test that column index is not created/written for all-nulls column when page
3632        // statistics are disabled.
3633        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3634        let props = Arc::new(
3635            WriterProperties::builder()
3636                .set_statistics_enabled(EnabledStatistics::None)
3637                .build(),
3638        );
3639        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3640        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3641
3642        let data = Vec::new();
3643        let def_levels = vec![0; 10];
3644        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3645        writer.flush_data_pages().unwrap();
3646
3647        let column_close_result = writer.close().unwrap();
3648        assert!(column_close_result.offset_index.is_some());
3649        assert!(column_close_result.column_index.is_none());
3650    }
3651
3652    #[test]
3653    fn test_no_offset_index_when_disabled() {
3654        // Test that offset indexes can be disabled
3655        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3656        let props = Arc::new(
3657            WriterProperties::builder()
3658                .set_statistics_enabled(EnabledStatistics::None)
3659                .set_offset_index_disabled(true)
3660                .build(),
3661        );
3662        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3663        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3664
3665        let data = Vec::new();
3666        let def_levels = vec![0; 10];
3667        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3668        writer.flush_data_pages().unwrap();
3669
3670        let column_close_result = writer.close().unwrap();
3671        assert!(column_close_result.offset_index.is_none());
3672        assert!(column_close_result.column_index.is_none());
3673    }
3674
3675    #[test]
3676    fn test_offset_index_overridden() {
3677        // Test that offset indexes are not disabled when gathering page statistics
3678        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3679        let props = Arc::new(
3680            WriterProperties::builder()
3681                .set_statistics_enabled(EnabledStatistics::Page)
3682                .set_offset_index_disabled(true)
3683                .build(),
3684        );
3685        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3686        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3687
3688        let data = Vec::new();
3689        let def_levels = vec![0; 10];
3690        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3691        writer.flush_data_pages().unwrap();
3692
3693        let column_close_result = writer.close().unwrap();
3694        assert!(column_close_result.offset_index.is_some());
3695        assert!(column_close_result.column_index.is_some());
3696    }
3697
3698    #[test]
3699    fn test_boundary_order() -> Result<()> {
3700        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3701        // min max both ascending
3702        let column_close_result = write_multiple_pages::<Int32Type>(
3703            &descr,
3704            &[
3705                &[Some(-10), Some(10)],
3706                &[Some(-5), Some(11)],
3707                &[None],
3708                &[Some(-5), Some(11)],
3709            ],
3710        )?;
3711        let boundary_order = column_close_result
3712            .column_index
3713            .unwrap()
3714            .get_boundary_order();
3715        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3716
3717        // min max both descending
3718        let column_close_result = write_multiple_pages::<Int32Type>(
3719            &descr,
3720            &[
3721                &[Some(10), Some(11)],
3722                &[Some(5), Some(11)],
3723                &[None],
3724                &[Some(-5), Some(0)],
3725            ],
3726        )?;
3727        let boundary_order = column_close_result
3728            .column_index
3729            .unwrap()
3730            .get_boundary_order();
3731        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3732
3733        // min max both equal
3734        let column_close_result = write_multiple_pages::<Int32Type>(
3735            &descr,
3736            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3737        )?;
3738        let boundary_order = column_close_result
3739            .column_index
3740            .unwrap()
3741            .get_boundary_order();
3742        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3743
3744        // only nulls
3745        let column_close_result =
3746            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3747        let boundary_order = column_close_result
3748            .column_index
3749            .unwrap()
3750            .get_boundary_order();
3751        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3752
3753        // one page
3754        let column_close_result =
3755            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3756        let boundary_order = column_close_result
3757            .column_index
3758            .unwrap()
3759            .get_boundary_order();
3760        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3761
3762        // one non-null page
3763        let column_close_result =
3764            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3765        let boundary_order = column_close_result
3766            .column_index
3767            .unwrap()
3768            .get_boundary_order();
3769        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3770
3771        // min max both unordered
3772        let column_close_result = write_multiple_pages::<Int32Type>(
3773            &descr,
3774            &[
3775                &[Some(10), Some(11)],
3776                &[Some(11), Some(16)],
3777                &[None],
3778                &[Some(-5), Some(0)],
3779            ],
3780        )?;
3781        let boundary_order = column_close_result
3782            .column_index
3783            .unwrap()
3784            .get_boundary_order();
3785        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3786
3787        // min max both ordered in different orders
3788        let column_close_result = write_multiple_pages::<Int32Type>(
3789            &descr,
3790            &[
3791                &[Some(1), Some(9)],
3792                &[Some(2), Some(8)],
3793                &[None],
3794                &[Some(3), Some(7)],
3795            ],
3796        )?;
3797        let boundary_order = column_close_result
3798            .column_index
3799            .unwrap()
3800            .get_boundary_order();
3801        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3802
3803        Ok(())
3804    }
3805
3806    #[test]
3807    fn test_boundary_order_logical_type() -> Result<()> {
3808        // ensure that logical types account for different sort order than underlying
3809        // physical type representation
3810        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3811        let fba_descr = {
3812            let tpe = SchemaType::primitive_type_builder(
3813                "col",
3814                FixedLenByteArrayType::get_physical_type(),
3815            )
3816            .with_length(2)
3817            .build()?;
3818            Arc::new(ColumnDescriptor::new(
3819                Arc::new(tpe),
3820                1,
3821                0,
3822                ColumnPath::from("col"),
3823            ))
3824        };
3825
3826        let values: &[&[Option<FixedLenByteArray>]] = &[
3827            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3828            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3829            &[Some(FixedLenByteArray::from(ByteArray::from(
3830                f16::NEG_ZERO,
3831            )))],
3832            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3833        ];
3834
3835        // f16 descending
3836        let column_close_result =
3837            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3838        let boundary_order = column_close_result
3839            .column_index
3840            .unwrap()
3841            .get_boundary_order();
3842        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3843
3844        // same bytes, but fba unordered
3845        let column_close_result =
3846            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3847        let boundary_order = column_close_result
3848            .column_index
3849            .unwrap()
3850            .get_boundary_order();
3851        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3852
3853        Ok(())
3854    }
3855
3856    #[test]
3857    fn test_interval_stats_should_not_have_min_max() {
3858        let input = [
3859            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3860            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3861            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3862        ]
3863        .into_iter()
3864        .map(|s| ByteArray::from(s).into())
3865        .collect::<Vec<_>>();
3866
3867        let page_writer = get_test_page_writer();
3868        let mut writer = get_test_interval_column_writer(page_writer);
3869        writer.write_batch(&input, None, None).unwrap();
3870
3871        let metadata = writer.close().unwrap().metadata;
3872        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3873            stats.clone()
3874        } else {
3875            panic!("metadata missing statistics");
3876        };
3877        assert!(stats.min_bytes_opt().is_none());
3878        assert!(stats.max_bytes_opt().is_none());
3879    }
3880
3881    #[test]
3882    #[cfg(feature = "arrow")]
3883    fn test_column_writer_get_estimated_total_bytes() {
3884        let page_writer = get_test_page_writer();
3885        let props = Default::default();
3886        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3887        assert_eq!(writer.get_estimated_total_bytes(), 0);
3888
3889        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3890        writer.add_data_page().unwrap();
3891        let size_with_one_page = writer.get_estimated_total_bytes();
3892        assert_eq!(size_with_one_page, 20);
3893
3894        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3895        writer.add_data_page().unwrap();
3896        let size_with_two_pages = writer.get_estimated_total_bytes();
3897        // different pages have different compressed lengths
3898        assert_eq!(size_with_two_pages, 20 + 21);
3899    }
3900
3901    fn write_multiple_pages<T: DataType>(
3902        column_descr: &Arc<ColumnDescriptor>,
3903        pages: &[&[Option<T::T>]],
3904    ) -> Result<ColumnCloseResult> {
3905        let column_writer = get_column_writer(
3906            column_descr.clone(),
3907            Default::default(),
3908            get_test_page_writer(),
3909        );
3910        let mut writer = get_typed_column_writer::<T>(column_writer);
3911
3912        for &page in pages {
3913            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3914            let def_levels = page
3915                .iter()
3916                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3917                .collect::<Vec<_>>();
3918            writer.write_batch(&values, Some(&def_levels), None)?;
3919            writer.flush_data_pages()?;
3920        }
3921
3922        writer.close()
3923    }
3924
3925    /// Performs write-read roundtrip with randomly generated values and levels.
3926    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3927    /// for a column.
3928    fn column_roundtrip_random<T: DataType>(
3929        props: WriterProperties,
3930        max_size: usize,
3931        min_value: T::T,
3932        max_value: T::T,
3933        max_def_level: i16,
3934        max_rep_level: i16,
3935    ) where
3936        T::T: PartialOrd + SampleUniform + Copy,
3937    {
3938        let mut num_values: usize = 0;
3939
3940        let mut buf: Vec<i16> = Vec::new();
3941        let def_levels = if max_def_level > 0 {
3942            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3943            for &dl in &buf[..] {
3944                if dl == max_def_level {
3945                    num_values += 1;
3946                }
3947            }
3948            Some(&buf[..])
3949        } else {
3950            num_values = max_size;
3951            None
3952        };
3953
3954        let mut buf: Vec<i16> = Vec::new();
3955        let rep_levels = if max_rep_level > 0 {
3956            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3957            buf[0] = 0; // Must start on record boundary
3958            Some(&buf[..])
3959        } else {
3960            None
3961        };
3962
3963        let mut values: Vec<T::T> = Vec::new();
3964        random_numbers_range(num_values, min_value, max_value, &mut values);
3965
3966        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3967    }
3968
3969    /// Performs write-read roundtrip and asserts written values and levels.
3970    fn column_roundtrip<T: DataType>(
3971        props: WriterProperties,
3972        values: &[T::T],
3973        def_levels: Option<&[i16]>,
3974        rep_levels: Option<&[i16]>,
3975    ) {
3976        let mut file = tempfile::tempfile().unwrap();
3977        let mut write = TrackedWrite::new(&mut file);
3978        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3979
3980        let max_def_level = match def_levels {
3981            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3982            None => 0i16,
3983        };
3984
3985        let max_rep_level = match rep_levels {
3986            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3987            None => 0i16,
3988        };
3989
3990        let mut max_batch_size = values.len();
3991        if let Some(levels) = def_levels {
3992            max_batch_size = max_batch_size.max(levels.len());
3993        }
3994        if let Some(levels) = rep_levels {
3995            max_batch_size = max_batch_size.max(levels.len());
3996        }
3997
3998        let mut writer =
3999            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4000
4001        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4002        assert_eq!(values_written, values.len());
4003        let result = writer.close().unwrap();
4004
4005        drop(write);
4006
4007        let props = ReaderProperties::builder()
4008            .set_backward_compatible_lz4(false)
4009            .build();
4010        let page_reader = Box::new(
4011            SerializedPageReader::new_with_properties(
4012                Arc::new(file),
4013                &result.metadata,
4014                result.rows_written as usize,
4015                None,
4016                Arc::new(props),
4017            )
4018            .unwrap(),
4019        );
4020        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4021
4022        let mut actual_values = Vec::with_capacity(max_batch_size);
4023        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4024        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4025
4026        let (_, values_read, levels_read) = reader
4027            .read_records(
4028                max_batch_size,
4029                actual_def_levels.as_mut(),
4030                actual_rep_levels.as_mut(),
4031                &mut actual_values,
4032            )
4033            .unwrap();
4034
4035        // Assert values, definition and repetition levels.
4036
4037        assert_eq!(&actual_values[..values_read], values);
4038        match actual_def_levels {
4039            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4040            None => assert_eq!(None, def_levels),
4041        }
4042        match actual_rep_levels {
4043            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4044            None => assert_eq!(None, rep_levels),
4045        }
4046
4047        // Assert written rows.
4048
4049        if let Some(levels) = actual_rep_levels {
4050            let mut actual_rows_written = 0;
4051            for l in levels {
4052                if l == 0 {
4053                    actual_rows_written += 1;
4054                }
4055            }
4056            assert_eq!(actual_rows_written, result.rows_written);
4057        } else if actual_def_levels.is_some() {
4058            assert_eq!(levels_read as u64, result.rows_written);
4059        } else {
4060            assert_eq!(values_read as u64, result.rows_written);
4061        }
4062    }
4063
4064    /// Performs write of provided values and returns column metadata of those values.
4065    /// Used to test encoding support for column writer.
4066    fn column_write_and_get_metadata<T: DataType>(
4067        props: WriterProperties,
4068        values: &[T::T],
4069    ) -> ColumnChunkMetaData {
4070        let page_writer = get_test_page_writer();
4071        let props = Arc::new(props);
4072        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4073        writer.write_batch(values, None, None).unwrap();
4074        writer.close().unwrap().metadata
4075    }
4076
4077    // Helper function to more compactly create a PageEncodingStats struct.
4078    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4079        PageEncodingStats {
4080            page_type,
4081            encoding,
4082            count,
4083        }
4084    }
4085
4086    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4087    // offset and encodings to make sure that column writer uses provided by trait
4088    // encodings.
4089    fn check_encoding_write_support<T: DataType>(
4090        version: WriterVersion,
4091        dict_enabled: bool,
4092        data: &[T::T],
4093        dictionary_page_offset: Option<i64>,
4094        encodings: &[Encoding],
4095        page_encoding_stats: &[PageEncodingStats],
4096    ) {
4097        let props = WriterProperties::builder()
4098            .set_writer_version(version)
4099            .set_dictionary_enabled(dict_enabled)
4100            .build();
4101        let meta = column_write_and_get_metadata::<T>(props, data);
4102        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4103        assert_eq!(meta.encodings(), encodings);
4104        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4105    }
4106
4107    /// Returns column writer.
4108    fn get_test_column_writer<'a, T: DataType>(
4109        page_writer: Box<dyn PageWriter + 'a>,
4110        max_def_level: i16,
4111        max_rep_level: i16,
4112        props: WriterPropertiesPtr,
4113    ) -> ColumnWriterImpl<'a, T> {
4114        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4115        let column_writer = get_column_writer(descr, props, page_writer);
4116        get_typed_column_writer::<T>(column_writer)
4117    }
4118
4119    /// Returns column reader.
4120    fn get_test_column_reader<T: DataType>(
4121        page_reader: Box<dyn PageReader>,
4122        max_def_level: i16,
4123        max_rep_level: i16,
4124    ) -> ColumnReaderImpl<T> {
4125        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4126        let column_reader = get_column_reader(descr, page_reader);
4127        get_typed_column_reader::<T>(column_reader)
4128    }
4129
4130    /// Returns descriptor for primitive column.
4131    fn get_test_column_descr<T: DataType>(
4132        max_def_level: i16,
4133        max_rep_level: i16,
4134    ) -> ColumnDescriptor {
4135        let path = ColumnPath::from("col");
4136        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4137            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4138            // it should be no-op for other types
4139            .with_length(1)
4140            .build()
4141            .unwrap();
4142        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4143    }
4144
4145    /// Returns page writer that collects pages without serializing them.
4146    fn get_test_page_writer() -> Box<dyn PageWriter> {
4147        Box::new(TestPageWriter {})
4148    }
4149
4150    struct TestPageWriter {}
4151
4152    impl PageWriter for TestPageWriter {
4153        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4154            let mut res = PageWriteSpec::new();
4155            res.page_type = page.page_type();
4156            res.uncompressed_size = page.uncompressed_size();
4157            res.compressed_size = page.compressed_size();
4158            res.num_values = page.num_values();
4159            res.offset = 0;
4160            res.bytes_written = page.data().len() as u64;
4161            Ok(res)
4162        }
4163
4164        fn close(&mut self) -> Result<()> {
4165            Ok(())
4166        }
4167    }
4168
4169    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4170    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4171        let page_writer = get_test_page_writer();
4172        let props = Default::default();
4173        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4174        writer.write_batch(values, None, None).unwrap();
4175
4176        let metadata = writer.close().unwrap().metadata;
4177        if let Some(stats) = metadata.statistics() {
4178            stats.clone()
4179        } else {
4180            panic!("metadata missing statistics");
4181        }
4182    }
4183
4184    /// Returns Decimals column writer.
4185    fn get_test_decimals_column_writer<T: DataType>(
4186        page_writer: Box<dyn PageWriter>,
4187        max_def_level: i16,
4188        max_rep_level: i16,
4189        props: WriterPropertiesPtr,
4190    ) -> ColumnWriterImpl<'static, T> {
4191        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4192            max_def_level,
4193            max_rep_level,
4194        ));
4195        let column_writer = get_column_writer(descr, props, page_writer);
4196        get_typed_column_writer::<T>(column_writer)
4197    }
4198
4199    /// Returns descriptor for Decimal type with primitive column.
4200    fn get_test_decimals_column_descr<T: DataType>(
4201        max_def_level: i16,
4202        max_rep_level: i16,
4203    ) -> ColumnDescriptor {
4204        let path = ColumnPath::from("col");
4205        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4206            .with_length(16)
4207            .with_logical_type(Some(LogicalType::Decimal {
4208                scale: 2,
4209                precision: 3,
4210            }))
4211            .with_scale(2)
4212            .with_precision(3)
4213            .build()
4214            .unwrap();
4215        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4216    }
4217
4218    fn float16_statistics_roundtrip(
4219        values: &[FixedLenByteArray],
4220    ) -> ValueStatistics<FixedLenByteArray> {
4221        let page_writer = get_test_page_writer();
4222        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4223        writer.write_batch(values, None, None).unwrap();
4224
4225        let metadata = writer.close().unwrap().metadata;
4226        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4227            stats.clone()
4228        } else {
4229            panic!("metadata missing statistics");
4230        }
4231    }
4232
4233    fn get_test_float16_column_writer(
4234        page_writer: Box<dyn PageWriter>,
4235        props: WriterPropertiesPtr,
4236    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4237        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4238        let column_writer = get_column_writer(descr, props, page_writer);
4239        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4240    }
4241
4242    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4243        let path = ColumnPath::from("col");
4244        let tpe =
4245            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4246                .with_length(2)
4247                .with_logical_type(Some(LogicalType::Float16))
4248                .build()
4249                .unwrap();
4250        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4251    }
4252
4253    fn get_test_interval_column_writer(
4254        page_writer: Box<dyn PageWriter>,
4255    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4256        let descr = Arc::new(get_test_interval_column_descr());
4257        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4258        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4259    }
4260
4261    fn get_test_interval_column_descr() -> ColumnDescriptor {
4262        let path = ColumnPath::from("col");
4263        let tpe =
4264            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4265                .with_length(12)
4266                .with_converted_type(ConvertedType::INTERVAL)
4267                .build()
4268                .unwrap();
4269        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4270    }
4271
4272    /// Returns column writer for UINT32 Column provided as ConvertedType only
4273    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4274        page_writer: Box<dyn PageWriter + 'a>,
4275        max_def_level: i16,
4276        max_rep_level: i16,
4277        props: WriterPropertiesPtr,
4278    ) -> ColumnWriterImpl<'a, T> {
4279        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4280            max_def_level,
4281            max_rep_level,
4282        ));
4283        let column_writer = get_column_writer(descr, props, page_writer);
4284        get_typed_column_writer::<T>(column_writer)
4285    }
4286
4287    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4288    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4289        max_def_level: i16,
4290        max_rep_level: i16,
4291    ) -> ColumnDescriptor {
4292        let path = ColumnPath::from("col");
4293        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4294            .with_converted_type(ConvertedType::UINT_32)
4295            .build()
4296            .unwrap();
4297        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4298    }
4299
4300    #[test]
4301    fn test_page_v2_snappy_compression_fallback() {
4302        // Test that PageV2 sets is_compressed to false when Snappy compression increases data size
4303        let page_writer = TestPageWriter {};
4304
4305        // Create WriterProperties with PageV2 and Snappy compression
4306        let props = WriterProperties::builder()
4307            .set_writer_version(WriterVersion::PARQUET_2_0)
4308            // Disable dictionary to ensure data is written directly
4309            .set_dictionary_enabled(false)
4310            .set_compression(Compression::SNAPPY)
4311            .build();
4312
4313        let mut column_writer =
4314            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4315
4316        // Create small, simple data that Snappy compression will likely increase in size
4317        // due to compression overhead for very small data
4318        let values = vec![ByteArray::from("a")];
4319
4320        column_writer.write_batch(&values, None, None).unwrap();
4321
4322        let result = column_writer.close().unwrap();
4323        assert_eq!(
4324            result.metadata.uncompressed_size(),
4325            result.metadata.compressed_size()
4326        );
4327    }
4328}